[sdc] update to the current code base
[sdc.git] / catalog-be / src / main / java / org / openecomp / sdc / be / components / distribution / engine / CambriaHandler.java
index 6738b87..e6d15b8 100644 (file)
@@ -21,6 +21,7 @@
 package org.openecomp.sdc.be.components.distribution.engine;
 
 import java.io.IOException;
+import java.net.MalformedURLException;
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -31,8 +32,10 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import com.att.nsa.cambria.client.*;
 import org.apache.http.HttpStatus;
 import org.openecomp.sdc.be.config.BeEcompErrorManager;
+import org.openecomp.sdc.be.config.ConfigurationManager;
 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
 import org.openecomp.sdc.common.config.EcompErrorName;
 import org.slf4j.Logger;
@@ -40,16 +43,12 @@ import org.slf4j.LoggerFactory;
 
 import com.att.nsa.apiClient.http.HttpException;
 import com.att.nsa.apiClient.http.HttpObjectNotFoundException;
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
 import com.att.nsa.cambria.client.CambriaClient.CambriaApiException;
 import com.att.nsa.cambria.client.CambriaClientBuilders.TopicManagerBuilder;
 import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
 import com.att.nsa.cambria.client.CambriaClientBuilders.IdentityManagerBuilder;
-import com.att.nsa.cambria.client.CambriaConsumer;
-import com.att.nsa.cambria.client.CambriaIdentityManager;
 import com.att.nsa.cambria.client.CambriaPublisher.message;
-import com.att.nsa.cambria.client.CambriaTopicManager;
 import com.google.gson.Gson;
 
 import fj.data.Either;
@@ -63,6 +62,8 @@ public class CambriaHandler {
 
        private Gson gson = new Gson();
 
+       public static boolean useHttpsWithDmaap = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration().isUseHttpsWithDmaap();
+
        public static void main(String[] args) {
 
                // String userBodyJson ="{\"artifactName\":\"myartifact\",
@@ -86,18 +87,18 @@ public class CambriaHandler {
                        String key = "sSJc5qiBnKy2qrlc";
                        String secret = "4ZRPzNJfEUK0sSNBvccd2m7X";
 
-                       createTopicManager = new TopicManagerBuilder().usingHttps().usingHosts(servers).authenticatedBy(key, secret).build();
+                       createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(servers).authenticatedBy(key, secret));
 
                        String topicName = "ASDC-DISTR-NOTIF-TOPIC-PRODesofer";
 
                        String clientKey1 = "CGGoorrGPXPx2B1C";
                        String clientSecret1 = "OTHk2mcCSbskEtHhDw8h5oUa";
 
-                       CambriaTopicManager createStatusTopicManager = new TopicManagerBuilder().usingHttps().usingHosts(servers).authenticatedBy(key, secret).build();
+                       CambriaTopicManager createStatusTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(servers).authenticatedBy(key, secret));
                        String reportTopic = "ASDC-DISTR-STATUS-TOPIC-PRODESOFER";
                        createStatusTopicManager.allowProducer(reportTopic, clientKey1);
 
-                       CambriaBatchingPublisher createSimplePublisher = new PublisherBuilder().onTopic(reportTopic).usingHttps().usingHosts(servers).build();
+                       CambriaBatchingPublisher createSimplePublisher = new PublisherBuilder().onTopic(reportTopic).usingHttps(useHttpsWithDmaap).usingHosts(servers).build();
                        createSimplePublisher.setApiCredentials(clientKey1, clientSecret1);
 
                        DistributionStatusNotification distributionStatusNotification = new DistributionStatusNotification();
@@ -126,20 +127,20 @@ public class CambriaHandler {
 
                        String clientKey2 = "TAIEPO0aDU4VzM0G";
 
-                       CambriaConsumer createConsumer1 = new ConsumerBuilder().authenticatedBy("asdc1", "consumerId1").onTopic(topicName).usingHttps().usingHosts(servers).build();
+                       CambriaConsumer createConsumer1 = new ConsumerBuilder().authenticatedBy("asdc1", "consumerId1").onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(servers).build();
                        createConsumer1.setApiCredentials(clientKey1, "OTHk2mcCSbskEtHhDw8h5oUa");
 
                        createTopicManager.allowConsumer(topicName, clientKey1);
 
                        CambriaConsumer createConsumer2 = null;
                        if (true) {
-                               createConsumer2 = new ConsumerBuilder().authenticatedBy("asdc2", "consumerId3").onTopic(topicName).usingHttps().usingHosts(servers).build();
+                               createConsumer2 = new ConsumerBuilder().authenticatedBy("asdc2", "consumerId3").onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(servers).build();
                                createConsumer2.setApiCredentials(clientKey2, "FSlNJbmGWWBvBLJetQMYxPP6");
 
                                createTopicManager.allowConsumer(topicName, clientKey2);
                        }
 
-                       createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps().usingHosts(servers).build();
+                       createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(servers).build();
                        createSimplePublisher.setApiCredentials(key, secret);
                        createTopicManager.allowProducer(topicName, key);
 
@@ -284,7 +285,7 @@ public class CambriaHandler {
                CambriaTopicManager createTopicManager = null;
                try {
 
-                       createTopicManager = new TopicManagerBuilder().usingHttps().usingHosts(hostSet).build();
+                       createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet));
 
                        Set<String> topics = createTopicManager.getTopics();
 
@@ -318,7 +319,6 @@ public class CambriaHandler {
         * 
         * set Cambria status and http code in case we succeed to fetch it
         * 
-        * @param errorMessage
         * @return
         */
        private CambriaErrorResponse processError(Exception e) {
@@ -415,8 +415,6 @@ public class CambriaHandler {
         *            - list of U-EB servers
         * @param apiKey
         * @param secretKey
-        * @param topicsList
-        *            - list of exists topics
         * @param topicName
         *            - topic to create
         * @param partitionCount
@@ -428,7 +426,7 @@ public class CambriaHandler {
                CambriaTopicManager createTopicManager = null;
                try {
 
-                       createTopicManager = new TopicManagerBuilder().usingHttps().usingHosts(hostSet).authenticatedBy(apiKey, secretKey).build();
+                       createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(apiKey, secretKey));
 
                        createTopicManager.createTopic(topicName, "ASDC distribution notification topic", partitionCount, replicationCount);
 
@@ -458,7 +456,7 @@ public class CambriaHandler {
        public CambriaErrorResponse unRegisterFromTopic(Collection<String> hostSet, String topicName, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum) {
                CambriaTopicManager createTopicManager = null;
                try {
-                       createTopicManager = new TopicManagerBuilder().usingHttps().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey).build();
+                       createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey));
 
                        if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) {
                                createTopicManager.revokeProducer(topicName, subscriberApiKey);
@@ -513,7 +511,7 @@ public class CambriaHandler {
 
                CambriaTopicManager createTopicManager = null;
                try {
-                       createTopicManager = new TopicManagerBuilder().usingHttps().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey).build();
+                       createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey));
 
                        if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) {
                                createTopicManager.allowProducer(topicName, subscriberApiKey);
@@ -567,7 +565,7 @@ public class CambriaHandler {
         */
        public CambriaConsumer createConsumer(Collection<String> hostSet, String topicName, String apiKey, String secretKey, String consumerId, String consumerGroup, int timeoutMS) throws Exception {
 
-               CambriaConsumer consumer = new ConsumerBuilder().authenticatedBy(apiKey, secretKey).knownAs(consumerGroup, consumerId).onTopic(topicName).usingHttps().usingHosts(hostSet).withSocketTimeout(timeoutMS).build();
+               CambriaConsumer consumer = new ConsumerBuilder().authenticatedBy(apiKey, secretKey).knownAs(consumerGroup, consumerId).onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(hostSet).withSocketTimeout(timeoutMS).build();
                consumer.setApiCredentials(apiKey, secretKey);
                return consumer;
        }
@@ -638,7 +636,7 @@ public class CambriaHandler {
                        String json = gson.toJson(data);
                        logger.trace("Before sending notification data {} to topic {}", json, topicName);
 
-                       createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps().usingHosts(uebServers).build();
+                       createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(uebServers).build();
                        createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey);
 
                        int result = createSimplePublisher.send(PARTITION_KEY, json);
@@ -700,7 +698,7 @@ public class CambriaHandler {
                        String json = gson.toJson(data);
                        logger.debug("Before sending notification data {} to topic {}", json, topicName);
 
-                       createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps().usingHosts(uebServers).build();
+                       createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(uebServers).build();
                        createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey);
 
                        int result = createSimplePublisher.send(PARTITION_KEY, json);
@@ -761,7 +759,7 @@ public class CambriaHandler {
                hostSet.add(server);
                CambriaIdentityManager createIdentityManager = null;
                try {
-                       createIdentityManager = new IdentityManagerBuilder().usingHttps().usingHosts(hostSet).build();
+                       createIdentityManager = buildCambriaClient(new IdentityManagerBuilder().usingHosts(hostSet));
                        createIdentityManager.getApiKey(apiKey);
                        response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
 
@@ -775,4 +773,10 @@ public class CambriaHandler {
                return response;
        }
 
+       private static <T extends CambriaClient> T buildCambriaClient(CambriaClientBuilders.AbstractAuthenticatedManagerBuilder<? extends CambriaClient> client) throws MalformedURLException, GeneralSecurityException {
+               if (useHttpsWithDmaap) {
+                       client.usingHttps();
+               }
+               return (T)client.build();
+       }
 }