* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
package org.openecomp.sdc.be.components.distribution.engine;
-import java.io.IOException;
-import java.security.GeneralSecurityException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.http.HttpStatus;
-import org.openecomp.sdc.be.config.BeEcompErrorManager;
-import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
-import org.openecomp.sdc.common.config.EcompErrorName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import com.att.nsa.apiClient.credentials.ApiCredential;
import com.att.nsa.apiClient.http.HttpException;
import com.att.nsa.apiClient.http.HttpObjectNotFoundException;
import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.cambria.client.CambriaClient;
import com.att.nsa.cambria.client.CambriaClient.CambriaApiException;
-import com.att.nsa.cambria.client.CambriaClientBuilders.TopicManagerBuilder;
-import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
+import com.att.nsa.cambria.client.CambriaClientBuilders;
+import com.att.nsa.cambria.client.CambriaClientBuilders.AbstractAuthenticatedManagerBuilder;
import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
import com.att.nsa.cambria.client.CambriaClientBuilders.IdentityManagerBuilder;
+import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
+import com.att.nsa.cambria.client.CambriaClientBuilders.TopicManagerBuilder;
import com.att.nsa.cambria.client.CambriaConsumer;
import com.att.nsa.cambria.client.CambriaIdentityManager;
import com.att.nsa.cambria.client.CambriaPublisher.message;
import com.att.nsa.cambria.client.CambriaTopicManager;
+import com.google.common.annotations.VisibleForTesting;
import com.google.gson.Gson;
-
import fj.data.Either;
-import jline.internal.Log;
-
-public class CambriaHandler {
-
- private static Logger logger = LoggerFactory.getLogger(CambriaHandler.class.getName());
-
- public static String PARTITION_KEY = "asdc" + "aa";
-
- private Gson gson = new Gson();
-
- public static void main(String[] args) {
-
- // String userBodyJson ="{\"artifactName\":\"myartifact\",
- // \"artifactType\":\"MURANO-PKG\",
- // \"artifactDescription\":\"description\",
- // \"payloadData\":\"UEsDBAoAAAAIAAeLb0bDQz\", \"Content-MD5\":
- // \"YTg2Mjg4MWJhNmI5NzBiNzdDFkMWI=\" }";
- // System.out.println(userBodyJson);
- // String encodeBase64Str = GeneralUtililty.calculateMD5 (userBodyJson);
- // System.out.println(encodeBase64Str);
-
- CambriaTopicManager createTopicManager = null;
- try {
- List<String> servers = new ArrayList<String>();
- // servers.add("uebsb91kcdc.it.sdc.com:3904");
- // servers.add("uebsb92kcdc.it.sdc.com:3904");
- // servers.add("uebsb93kcdc.it.sdc.com:3904");
- servers.add("uebsb91sfdc.it.att.com:3904");
- servers.add("uebsb92sfdc.it.att.com:3904");
-
- String key = "sSJc5qiBnKy2qrlc";
- String secret = "4ZRPzNJfEUK0sSNBvccd2m7X";
-
- createTopicManager = new TopicManagerBuilder().usingHttps().usingHosts(servers).authenticatedBy(key, secret).build();
-
- String topicName = "ASDC-DISTR-NOTIF-TOPIC-PRODesofer";
-
- String clientKey1 = "CGGoorrGPXPx2B1C";
- String clientSecret1 = "OTHk2mcCSbskEtHhDw8h5oUa";
-
- CambriaTopicManager createStatusTopicManager = new TopicManagerBuilder().usingHttps().usingHosts(servers).authenticatedBy(key, secret).build();
- String reportTopic = "ASDC-DISTR-STATUS-TOPIC-PRODESOFER";
- createStatusTopicManager.allowProducer(reportTopic, clientKey1);
-
- CambriaBatchingPublisher createSimplePublisher = new PublisherBuilder().onTopic(reportTopic).usingHttps().usingHosts(servers).build();
- createSimplePublisher.setApiCredentials(clientKey1, clientSecret1);
-
- DistributionStatusNotification distributionStatusNotification = new DistributionStatusNotification();
- distributionStatusNotification.setStatus(DistributionStatusNotificationEnum.DEPLOY_OK);
- distributionStatusNotification.setArtifactURL("Ssssssss url");
- distributionStatusNotification.setDistributionID("idddddddddddddd");
- distributionStatusNotification.setTimestamp(System.currentTimeMillis());
- distributionStatusNotification.setConsumerID("my consumer id");
-
- Gson gson = new Gson();
- int result = createSimplePublisher.send(PARTITION_KEY, gson.toJson(distributionStatusNotification));
-
- List<message> messagesInQ = createSimplePublisher.close(20, TimeUnit.SECONDS);
- System.out.println(messagesInQ == null ? 0 : messagesInQ.size());
-
- // createTopicManager.createTopic(topicName, "my test topic", 1, 1);
-
- /*
- *
- * { "secret": "OTHk2mcCSbskEtHhDw8h5oUa", "aux": { "email": "esofer@intl.sdc.com", "description": "test-keys" }, "key": "CGGoorrGPXPx2B1C" }
- *
- *
- * { "secret": "FSlNJbmGWWBvBLJetQMYxPP6", "aux": { "email": "esofer@intl.sdc.com", "description": "test-keys" }, "key": "TAIEPO0aDU4VzM0G" }
- *
- */
-
- String clientKey2 = "TAIEPO0aDU4VzM0G";
-
- CambriaConsumer createConsumer1 = new ConsumerBuilder().authenticatedBy("asdc1", "consumerId1").onTopic(topicName).usingHttps().usingHosts(servers).build();
- createConsumer1.setApiCredentials(clientKey1, "OTHk2mcCSbskEtHhDw8h5oUa");
-
- createTopicManager.allowConsumer(topicName, clientKey1);
-
- CambriaConsumer createConsumer2 = null;
- if (true) {
- createConsumer2 = new ConsumerBuilder().authenticatedBy("asdc2", "consumerId3").onTopic(topicName).usingHttps().usingHosts(servers).build();
- createConsumer2.setApiCredentials(clientKey2, "FSlNJbmGWWBvBLJetQMYxPP6");
-
- createTopicManager.allowConsumer(topicName, clientKey2);
- }
-
- createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps().usingHosts(servers).build();
- createSimplePublisher.setApiCredentials(key, secret);
- createTopicManager.allowProducer(topicName, key);
-
- createSimplePublisher.send("aaaa", "{ my testttttttttttttttt }");
-
- while (true) {
-
- Iterable<String> fetch1 = createConsumer1.fetch();
-
- Iterator<String> iterator1 = fetch1.iterator();
- while (iterator1.hasNext()) {
- System.out.println("***********************************************");
- System.out.println("client 1" + iterator1.next());
- System.out.println("***********************************************");
- }
-
- if (createConsumer2 != null) {
- Iterable<String> fetch2 = createConsumer2.fetch();
-
- Iterator<String> iterator2 = fetch2.iterator();
- while (iterator2.hasNext()) {
- System.out.println("***********************************************");
- System.out.println("client 2" + iterator2.next());
- System.out.println("***********************************************");
- }
- }
- Thread.sleep(1000 * 20);
- }
-
- // createTopicManager = CambriaClientFactory.createTopicManager(
- // servers, "8F3MDAtMSBwwpSMy", "gzFmsTxSCtO5RQfAccM6PqqX");
-
- // createTopicManager.deleteTopic("ASDC-DISTR-NOTIF-TOPIC-PROD");
- // createTopicManager.deleteTopic("ASDC-DISTR-NOTIF-TOPIC-PROD1");
-
- // CambriaIdentityManager createIdentityManager =
- // CambriaClientFactory.createIdentityManager(null, null, null);
- // createIdentityManager.setApiCredentials(arg0, arg1);
- // createIdentityManager.cl
-
- // String topicName = " ";
- // createTopicManager.createTopic(topicName,
- // "ASDC distribution notification topic", 1, 1);
- //
- // Thread.sleep(10 * 1000);
- //
- // for (int i = 0; i < 5; i++) {
- // try {
- // boolean openForProducing = createTopicManager
- // .isOpenForProducing(topicName);
- //
- // System.out.println("openForProducing=" + openForProducing);
- // createTopicManager.allowProducer(topicName,
- // "8F3MDAtMSBwwpSMy");
- // Set<String> allowedProducers = createTopicManager
- // .getAllowedProducers(topicName);
- // System.out.println(allowedProducers);
- //
- // } catch (Exception e) {
- // e.printStackTrace();
- // }
- // }
-
- // createTopicManager.createTopic("", "", 0, 0);
- // createTopicManager.allowProducer(arg0, arg1);
- // createTopicManager.getTopics();
- // createTopicManager.close();
- // CambriaClientFactory.
- // CambriaBatchingPublisher createSimplePublisher =
- // CambriaClientFactory.createSimplePublisher("hostlist", "topic");
-
- // CambriaIdentityManager createIdentityManager =
- // CambriaClientFactory.createIdentityManager(null, "apiKey",
- // "apiSecret");
- // createIdentityManager.
-
- } catch (Exception e) {
- Log.debug("Exception in main test of Cambria Handler: {}", e.getMessage(), e);
- e.printStackTrace();
- } finally {
- if (createTopicManager != null) {
- createTopicManager.close();
- }
- }
- }
-
- /**
- * process the response error from Cambria client
- *
- * @param message
- * @return
- */
- private Integer processMessageException(String message) {
-
- String[] patterns = { "(HTTP Status )(\\d\\d\\d)", "(HTTP/\\d.\\d )(\\d\\d\\d)" };
-
- Integer result = checkPattern(patterns[0], message, 2);
- if (result != null) {
- return result;
- }
- result = checkPattern(patterns[1], message, 2);
-
- return result;
-
- }
-
- /**
- * check whether the message has a match with a given pattern inside it
- *
- * @param patternStr
- * @param message
- * @param groupIndex
- * @return
- */
- private Integer checkPattern(String patternStr, String message, int groupIndex) {
- Integer result = null;
-
- Pattern pattern = Pattern.compile(patternStr);
- Matcher matcher = pattern.matcher(message);
- boolean find = matcher.find();
- if (find) {
- String httpCode = matcher.group(groupIndex);
- if (httpCode != null) {
- try {
- result = Integer.valueOf(httpCode);
- } catch (NumberFormatException e) {
- logger.debug("Failed to parse http code {}", httpCode);
- }
- }
- }
- return result;
- }
-
- /**
- * retrieve all topics from U-EB server
- *
- * @param hostSet
- * @return
- */
- public Either<Set<String>, CambriaErrorResponse> getTopics(List<String> hostSet) {
-
- CambriaTopicManager createTopicManager = null;
- try {
-
- createTopicManager = new TopicManagerBuilder().usingHttps().usingHosts(hostSet).build();
-
- Set<String> topics = createTopicManager.getTopics();
-
- if (topics == null || true == topics.isEmpty()) {
- CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.NOT_FOUND, null);
- return Either.right(cambriaErrorResponse);
- }
-
- return Either.left(topics);
-
- } catch (IOException | GeneralSecurityException e) {
- String methodName = new Object() {
- }.getClass().getEnclosingMethod().getName();
-
- CambriaErrorResponse cambriaErrorResponse = processError(e);
-
- logger.debug("Failed to fetch topics from U-EB server", e);
- writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "get topics");
-
- return Either.right(cambriaErrorResponse);
- } finally {
- if (createTopicManager != null) {
- createTopicManager.close();
- }
- }
-
- }
-
- /**
- * process the error message from Cambria client.
- *
- * set Cambria status and http code in case we succeed to fetch it
- *
- * @param errorMessage
- * @return
- */
- private CambriaErrorResponse processError(Exception e) {
-
- CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse();
-
- Integer httpCode = processMessageException(e.getMessage());
-
- if (httpCode != null) {
- cambriaErrorResponse.setHttpCode(httpCode);
- switch (httpCode.intValue()) {
-
- case 401:
- cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.AUTHENTICATION_ERROR);
- break;
- case 409:
- cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.TOPIC_ALREADY_EXIST);
- break;
- case 500:
- cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.INTERNAL_SERVER_ERROR);
- break;
- default:
- cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.CONNNECTION_ERROR);
- }
- } else {
-
- boolean found = false;
- Throwable throwable = e.getCause();
- if (throwable != null) {
- String message = throwable.getMessage();
-
- Throwable cause = throwable.getCause();
-
- if (cause != null) {
- Class<?> clazz = cause.getClass();
- String className = clazz.getName();
- if (className.endsWith("UnknownHostException")) {
- cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.UNKNOWN_HOST_ERROR);
- cambriaErrorResponse.addVariable(message);
- found = true;
- }
- }
- }
-
- if (false == found) {
- cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.CONNNECTION_ERROR);
- cambriaErrorResponse.setHttpCode(HttpStatus.SC_INTERNAL_SERVER_ERROR);
- }
- }
-
- return cambriaErrorResponse;
- }
-
- /**
- * write the error to the log
- *
- * @param cambriaErrorResponse
- * @param errorMessage
- * @param methodName
- * @param operationDesc
- */
- private void writeErrorToLog(CambriaErrorResponse cambriaErrorResponse, String errorMessage, String methodName, String operationDesc) {
-
- String httpCode = (cambriaErrorResponse.getHttpCode() == null ? "" : String.valueOf(cambriaErrorResponse.getHttpCode()));
-
- switch (cambriaErrorResponse.getOperationStatus()) {
- case UNKNOWN_HOST_ERROR:
- String hostname = cambriaErrorResponse.getVariables().get(0);
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebUnkownHostError, methodName, hostname);
- BeEcompErrorManager.getInstance().logBeUebUnkownHostError(methodName, httpCode);
- break;
- case AUTHENTICATION_ERROR:
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebAuthenticationError, methodName, httpCode);
- BeEcompErrorManager.getInstance().logBeUebAuthenticationError(methodName, httpCode);
- break;
- case CONNNECTION_ERROR:
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebConnectionError, methodName, httpCode);
- BeEcompErrorManager.getInstance().logBeUebConnectionError(methodName, httpCode);
- break;
-
- case INTERNAL_SERVER_ERROR:
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, methodName, operationDesc);
- BeEcompErrorManager.getInstance().logBeUebSystemError(methodName, operationDesc);
- break;
-
- }
-
- }
-
- /**
- * create a topic if it does not exists in the topicsList
- *
- * @param hostSet
- * - list of U-EB servers
- * @param apiKey
- * @param secretKey
- * @param topicsList
- * - list of exists topics
- * @param topicName
- * - topic to create
- * @param partitionCount
- * @param replicationCount
- * @return
- */
- public CambriaErrorResponse createTopic(Collection<String> hostSet, String apiKey, String secretKey, String topicName, int partitionCount, int replicationCount) {
-
- CambriaTopicManager createTopicManager = null;
- try {
-
- createTopicManager = new TopicManagerBuilder().usingHttps().usingHosts(hostSet).authenticatedBy(apiKey, secretKey).build();
-
- createTopicManager.createTopic(topicName, "ASDC distribution notification topic", partitionCount, replicationCount);
-
- } catch (HttpException | IOException | GeneralSecurityException e) {
-
- logger.debug("Failed to create topic {}", topicName, e);
- String methodName = new Object() {
- }.getClass().getEnclosingMethod().getName();
-
- CambriaErrorResponse cambriaErrorResponse = processError(e);
-
- if (cambriaErrorResponse.getOperationStatus() != CambriaOperationStatus.TOPIC_ALREADY_EXIST) {
- writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "create topic");
- }
-
- return cambriaErrorResponse;
-
- } finally {
- if (createTopicManager != null) {
- createTopicManager.close();
- }
- }
- return new CambriaErrorResponse(CambriaOperationStatus.OK);
-
- }
-
- public CambriaErrorResponse unRegisterFromTopic(Collection<String> hostSet, String topicName, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum) {
- CambriaTopicManager createTopicManager = null;
- try {
- createTopicManager = new TopicManagerBuilder().usingHttps().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey).build();
-
- if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) {
- createTopicManager.revokeProducer(topicName, subscriberApiKey);
- } else {
- createTopicManager.revokeConsumer(topicName, subscriberApiKey);
- }
-
- } catch (HttpObjectNotFoundException | GeneralSecurityException e) {
- logger.debug("Failed to unregister {} from topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString().toLowerCase(), e);
- String methodName = new Object() {
- }.getClass().getEnclosingMethod().getName();
-
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebObjectNotFoundError, methodName, e.getMessage());
- BeEcompErrorManager.getInstance().logBeUebObjectNotFoundError(methodName, e.getMessage());
-
- CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OBJECT_NOT_FOUND, HttpStatus.SC_NOT_FOUND);
- return cambriaErrorResponse;
-
- } catch (HttpException | IOException e) {
- logger.debug("Failed to unregister {} from topic {} as producer", managerApiKey, topicName, e);
- String methodName = new Object() {
- }.getClass().getEnclosingMethod().getName();
-
- CambriaErrorResponse cambriaErrorResponse = processError(e);
-
- writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "unregister from topic as " + subscriberTypeEnum.toString().toLowerCase());
-
- return cambriaErrorResponse;
- } finally {
- if (createTopicManager != null) {
- createTopicManager.close();
- }
- }
-
- CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK, HttpStatus.SC_OK);
- return cambriaErrorResponse;
- }
-
- /**
- *
- * register a public key (subscriberId) to a given topic as a CONSUMER or PRODUCER
- *
- * @param hostSet
- * @param topicName
- * @param managerApiKey
- * @param managerSecretKey
- * @param subscriberApiKey
- * @param subscriberTypeEnum
- * @return
- */
- public CambriaErrorResponse registerToTopic(Collection<String> hostSet, String topicName, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum) {
-
- CambriaTopicManager createTopicManager = null;
- try {
- createTopicManager = new TopicManagerBuilder().usingHttps().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey).build();
-
- if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) {
- createTopicManager.allowProducer(topicName, subscriberApiKey);
- } else {
- createTopicManager.allowConsumer(topicName, subscriberApiKey);
- }
-
- } catch (HttpObjectNotFoundException | GeneralSecurityException e) {
- logger.debug("Failed to register {} to topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString().toLowerCase(), e);
- String methodName = new Object() {
- }.getClass().getEnclosingMethod().getName();
-
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebObjectNotFoundError, methodName, e.getMessage());
- BeEcompErrorManager.getInstance().logBeUebObjectNotFoundError(methodName, e.getMessage());
-
- CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OBJECT_NOT_FOUND, HttpStatus.SC_NOT_FOUND);
- return cambriaErrorResponse;
-
- } catch (HttpException | IOException e) {
- logger.debug("Failed to register {} to topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString().toLowerCase(), e);
- String methodName = new Object() {
- }.getClass().getEnclosingMethod().getName();
-
- CambriaErrorResponse cambriaErrorResponse = processError(e);
-
- writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "register to topic as " + subscriberTypeEnum.toString().toLowerCase());
-
- return cambriaErrorResponse;
- } finally {
- if (createTopicManager != null) {
- createTopicManager.close();
- }
- }
-
- CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK, HttpStatus.SC_OK);
- return cambriaErrorResponse;
- }
-
- /**
- * create and retrieve a Cambria Consumer for a specific topic
- *
- * @param hostSet
- * @param topicName
- * @param apiKey
- * @param secretKey
- * @param consumerId
- * @param consumerGroup
- * @param timeoutMS
- * @return
- * @throws Exception
- */
- public CambriaConsumer createConsumer(Collection<String> hostSet, String topicName, String apiKey, String secretKey, String consumerId, String consumerGroup, int timeoutMS) throws Exception {
-
- CambriaConsumer consumer = new ConsumerBuilder().authenticatedBy(apiKey, secretKey).knownAs(consumerGroup, consumerId).onTopic(topicName).usingHttps().usingHosts(hostSet).withSocketTimeout(timeoutMS).build();
- consumer.setApiCredentials(apiKey, secretKey);
- return consumer;
- }
-
- public void closeConsumer(CambriaConsumer consumer) {
-
- if (consumer != null) {
- consumer.close();
- }
-
- }
-
- /**
- * use the topicConsumer to fetch messages from topic. in case no messages were fetched, empty ArrayList will be returned (not error)
- *
- * @param topicConsumer
- * @return
- */
- public Either<Iterable<String>, CambriaErrorResponse> fetchFromTopic(CambriaConsumer topicConsumer) {
-
- try {
- Iterable<String> messages = topicConsumer.fetch();
- if (messages == null) {
- messages = new ArrayList<String>();
- }
- return Either.left(messages);
-
- } catch (IOException e) {
- String methodName = new Object() {
- }.getClass().getEnclosingMethod().getName();
-
- CambriaErrorResponse cambriaErrorResponse = processError(e);
-
- logger.debug("Failed to fetch from U-EB topic. error={}", e.getMessage());
- writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "get messages from topic");
-
- return Either.right(cambriaErrorResponse);
-
- } catch (Exception e) {
- logger.debug("Failed to fetch from U-EB topic", e);
- String methodName = new Object() {
- }.getClass().getEnclosingMethod().getName();
-
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeDistributionEngineSystemError, methodName, e.getMessage());
- BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
-
- CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, HttpStatus.SC_INTERNAL_SERVER_ERROR);
- return Either.right(cambriaErrorResponse);
- }
- }
-
- /**
- * Publish notification message to a given queue
- *
- * @param topicName
- * @param uebPublicKey
- * @param uebSecretKey
- * @param uebServers
- * @param data
- * @return
- */
- public CambriaErrorResponse sendNotification(String topicName, String uebPublicKey, String uebSecretKey, List<String> uebServers, INotificationData data) {
-
- CambriaBatchingPublisher createSimplePublisher = null;
-
- try {
-
- String json = gson.toJson(data);
- logger.trace("Before sending notification data {} to topic {}", json, topicName);
-
- createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps().usingHosts(uebServers).build();
- createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey);
-
- int result = createSimplePublisher.send(PARTITION_KEY, json);
-
- try {
- Thread.sleep(1 * 1000);
- } catch (InterruptedException e) {
- logger.debug("Failed during sleep after sending the message.", e);
- }
-
- logger.debug("After sending notification data to topic {}. result is {}", topicName, result);
-
- CambriaErrorResponse response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
-
- return response;
-
- } catch (IOException | GeneralSecurityException e) {
- logger.debug("Failed to send notification {} to topic {} ", data, topicName, e);
-
- String methodName = new Object() {
- }.getClass().getEnclosingMethod().getName();
-
- CambriaErrorResponse cambriaErrorResponse = processError(e);
-
- writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "send notification");
-
- return cambriaErrorResponse;
- } finally {
- if (createSimplePublisher != null) {
- logger.debug("Before closing publisher");
- createSimplePublisher.close();
- logger.debug("After closing publisher");
- }
- }
- }
-
- private String convertListToString(List<String> list) {
- StringBuilder builder = new StringBuilder();
-
- if (list != null) {
- for (int i = 0; i < list.size(); i++) {
- builder.append(list.get(i));
- if (i < list.size() - 1) {
- builder.append(",");
- }
- }
- }
-
- return builder.toString();
- }
-
- public CambriaErrorResponse sendNotificationAndClose(String topicName, String uebPublicKey, String uebSecretKey, List<String> uebServers, INotificationData data, long waitBeforeCloseTimeout) {
-
- CambriaBatchingPublisher createSimplePublisher = null;
-
- CambriaErrorResponse response = null;
- try {
-
- String json = gson.toJson(data);
- logger.debug("Before sending notification data {} to topic {}", json, topicName);
-
- createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps().usingHosts(uebServers).build();
- createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey);
-
- int result = createSimplePublisher.send(PARTITION_KEY, json);
-
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- logger.debug("Failed during sleep after sending the message.", e);
- }
-
- logger.debug("After sending notification data to topic {}. result is {}", topicName, result);
-
- } catch (IOException | GeneralSecurityException e) {
- logger.debug("Failed to send notification {} to topic {} ", data, topicName, e);
-
- String methodName = new Object() {
- }.getClass().getEnclosingMethod().getName();
-
- response = processError(e);
-
- writeErrorToLog(response, e.getMessage(), methodName, "send notification");
-
- return response;
-
- }
-
- logger.debug("Before closing publisher. Maximum timeout is {} seconds", waitBeforeCloseTimeout);
- try {
- List<message> messagesInQ = createSimplePublisher.close(waitBeforeCloseTimeout, TimeUnit.SECONDS);
- if (messagesInQ != null && false == messagesInQ.isEmpty()) {
- logger.debug("Cambria client returned {} non sent messages.", messagesInQ.size());
- response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
- String methodName = new Object() {
- }.getClass().getEnclosingMethod().getName();
- writeErrorToLog(response, "closing publisher returned non sent messages", methodName, "send notification");
- } else {
- logger.debug("No message left in the queue after closing cambria publisher");
- response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
- }
- } catch (IOException | InterruptedException e) {
- logger.debug("Failed to close cambria publisher", e);
- response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
- String methodName = new Object() {
- }.getClass().getEnclosingMethod().getName();
- writeErrorToLog(response, "closing publisher returned non sent messages", methodName, "send notification");
- }
- logger.debug("After closing publisher");
-
- return response;
-
- }
-
- public CambriaErrorResponse getApiKey(String server, String apiKey) {
-
- CambriaErrorResponse response = null;
-
- List<String> hostSet = new ArrayList<>();
- hostSet.add(server);
- CambriaIdentityManager createIdentityManager = null;
- try {
- createIdentityManager = new IdentityManagerBuilder().usingHttps().usingHosts(hostSet).build();
- createIdentityManager.getApiKey(apiKey);
- response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
-
- } catch (HttpException | IOException | CambriaApiException | GeneralSecurityException e) {
- logger.debug("Failed to fetch api key {} from server ", apiKey, server, e);
-
- response = processError(e);
+import org.apache.http.HttpStatus;
+import org.openecomp.sdc.be.config.BeEcompErrorManager;
+import org.openecomp.sdc.be.config.ConfigurationManager;
+import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
+import org.openecomp.sdc.common.log.wrappers.Logger;
+import org.springframework.stereotype.Component;
- }
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
- return response;
- }
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+@Component("cambriaHandler")
+public class CambriaHandler implements ICambriaHandler{
+
+ private static final Logger log = Logger.getLogger(CambriaHandler.class.getName());
+ private static final String PARTITION_KEY = "asdc" + "aa";
+ private static final String SEND_NOTIFICATION = "send notification";
+ private static final String CONSUMER_ID = ConfigurationManager.getConfigurationManager()
+ .getDistributionEngineConfiguration()
+ .getDistributionStatusTopic()
+ .getConsumerId();
+ private static final boolean USE_HTTPS_WITH_DMAAP = ConfigurationManager.getConfigurationManager()
+ .getDistributionEngineConfiguration()
+ .isUseHttpsWithDmaap();
+ private final Gson gson = new Gson();
+
+
+ /**
+ * process the response error from Cambria client
+ *
+ * @param message
+ * @return
+ */
+ private Integer processMessageException(String message) {
+
+ String[] patterns = {"(HTTP Status )(\\d\\d\\d)", "(HTTP/\\d.\\d )(\\d\\d\\d)"};
+
+ Integer result = checkPattern(patterns[0], message, 2);
+ if (result != null) {
+ return result;
+ }
+ result = checkPattern(patterns[1], message, 2);
+
+ return result;
+
+ }
+
+ /**
+ * check whether the message has a match with a given pattern inside it
+ *
+ * @param patternStr
+ * @param message
+ * @param groupIndex
+ * @return
+ */
+ private Integer checkPattern(String patternStr, String message, int groupIndex) {
+ Integer result = null;
+
+ Pattern pattern = Pattern.compile(patternStr);
+ Matcher matcher = pattern.matcher(message);
+ boolean find = matcher.find();
+ if (find) {
+ String httpCode = matcher.group(groupIndex);
+ if (httpCode != null) {
+ try {
+ result = Integer.valueOf(httpCode);
+ }
+ catch (NumberFormatException e) {
+ log.debug("Failed to parse http code {}", httpCode);
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * retrieve all topics from U-EB server
+ *
+ * @param hostSet
+ * @return
+ */
+ @Override
+ public Either<Set<String>, CambriaErrorResponse> getTopics(List<String> hostSet) {
+
+ CambriaTopicManager createTopicManager = null;
+ try {
+
+ createTopicManager = buildCambriaClient(createTopicManagerBuilder(hostSet));
+
+ Set<String> topics = createTopicManager.getTopics();
+
+ if (topics == null || topics.isEmpty()) {
+ CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.NOT_FOUND, null);
+ return Either.right(cambriaErrorResponse);
+ }
+
+ return Either.left(topics);
+
+ }
+ catch (IOException | GeneralSecurityException e) {
+
+ CambriaErrorResponse cambriaErrorResponse = processError(e);
+
+ log.debug("Failed to fetch topics from U-EB server", e);
+ writeErrorToLog(cambriaErrorResponse, "getTopics", "get topics");
+
+ return Either.right(cambriaErrorResponse);
+ } finally {
+ if (createTopicManager != null) {
+ createTopicManager.close();
+ }
+ }
+
+ }
+
+ /**
+ * process the error message from Cambria client.
+ * <p>
+ * set Cambria status and http code in case we succeed to fetch it
+ *
+ * @return
+ */
+ private CambriaErrorResponse processError(Exception e) {
+
+ CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse();
+
+ Integer httpCode = processMessageException(e.getMessage());
+
+ if (httpCode != null) {
+ cambriaErrorResponse.setHttpCode(httpCode);
+ switch (httpCode.intValue()) {
+
+ case 401:
+ cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.AUTHENTICATION_ERROR);
+ break;
+ case 409:
+ cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.TOPIC_ALREADY_EXIST);
+ break;
+ case 500:
+ cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.INTERNAL_SERVER_ERROR);
+ break;
+ default:
+ cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.CONNNECTION_ERROR);
+ }
+ }
+ else {
+
+ boolean found = false;
+ Throwable throwable = e.getCause();
+ if (throwable != null) {
+ String message = throwable.getMessage();
+
+ Throwable cause = throwable.getCause();
+
+ if (cause != null) {
+ Class<?> clazz = cause.getClass();
+ String className = clazz.getName();
+ if (className.endsWith("UnknownHostException")) {
+ cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.UNKNOWN_HOST_ERROR);
+ cambriaErrorResponse.addVariable(message);
+ found = true;
+ }
+ }
+ }
+
+ if (!found) {
+ cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.CONNNECTION_ERROR);
+ cambriaErrorResponse.setHttpCode(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ return cambriaErrorResponse;
+ }
+
+ /**
+ * write the error to the log
+ * @param cambriaErrorResponse
+ * @param methodName
+ * @param operationDesc
+ */
+ private void writeErrorToLog(CambriaErrorResponse cambriaErrorResponse, String methodName, String operationDesc) {
+
+ String httpCode = cambriaErrorResponse.getHttpCode() == null ? "" : String.valueOf(cambriaErrorResponse.getHttpCode());
+
+ switch (cambriaErrorResponse.getOperationStatus()) {
+ case UNKNOWN_HOST_ERROR:
+ BeEcompErrorManager.getInstance().logBeUebUnkownHostError(methodName, httpCode);
+ break;
+ case AUTHENTICATION_ERROR:
+ BeEcompErrorManager.getInstance().logBeUebAuthenticationError(methodName, httpCode);
+ break;
+ case CONNNECTION_ERROR:
+ BeEcompErrorManager.getInstance().logBeUebConnectionError(methodName, httpCode);
+ break;
+ case INTERNAL_SERVER_ERROR:
+ BeEcompErrorManager.getInstance().logBeUebSystemError(methodName, operationDesc);
+ break;
+ default:
+ break;
+ }
+
+ }
+
+ /**
+ * create a topic if it does not exists in the topicsList
+ *
+ * @param hostSet - list of U-EB servers
+ * @param apiKey
+ * @param secretKey
+ * @param topicName - topic to create
+ * @param partitionCount
+ * @param replicationCount
+ * @return
+ */
+ @Override
+ public CambriaErrorResponse createTopic(Collection<String> hostSet, String apiKey, String secretKey, String topicName, int partitionCount, int replicationCount) {
+
+ CambriaTopicManager createTopicManager = null;
+ try {
+
+ AbstractAuthenticatedManagerBuilder<CambriaTopicManager> clientBuilder = createTopicManagerBuilder(hostSet, apiKey, secretKey);
+ createTopicManager = buildCambriaClient(clientBuilder);
+
+ createTopicManager.createTopic(topicName, "ASDC distribution notification topic", partitionCount, replicationCount);
+
+ }
+ catch (HttpException | IOException | GeneralSecurityException e) {
+
+ log.debug("Failed to create topic {}", topicName, e);
+
+ CambriaErrorResponse cambriaErrorResponse = processError(e);
+
+ if (cambriaErrorResponse.getOperationStatus() != CambriaOperationStatus.TOPIC_ALREADY_EXIST) {
+ writeErrorToLog(cambriaErrorResponse, "createTopic", "create topic");
+ }
+
+ return cambriaErrorResponse;
+
+ } finally {
+ if (createTopicManager != null) {
+ createTopicManager.close();
+ }
+ }
+ return new CambriaErrorResponse(CambriaOperationStatus.OK);
+
+ }
+ @Override
+ public CambriaErrorResponse unRegisterFromTopic(Collection<String> hostSet, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum, String topicName) {
+ String methodName = "unRegisterFromTopic";
+ CambriaTopicManager createTopicManager = null;
+ try {
+ AbstractAuthenticatedManagerBuilder<CambriaTopicManager> clientBuilder = createTopicManagerBuilder(hostSet, managerApiKey, managerSecretKey);
+
+ createTopicManager = buildCambriaClient(clientBuilder);
+
+ if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) {
+ createTopicManager.revokeProducer(topicName, subscriberApiKey);
+ }
+ else {
+ createTopicManager.revokeConsumer(topicName, subscriberApiKey);
+ }
+
+ }
+ catch (HttpObjectNotFoundException e) {
+ log.debug("Failed to unregister {} from topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString()
+ .toLowerCase(), e);
+ BeEcompErrorManager.getInstance().logBeUebObjectNotFoundError(methodName, e.getMessage());
+
+ return new CambriaErrorResponse(CambriaOperationStatus.OBJECT_NOT_FOUND, HttpStatus.SC_NOT_FOUND);
+
+ }
+ catch (HttpException | IOException | GeneralSecurityException e) {
+ log.debug("Failed to unregister {} from topic {} as producer", managerApiKey, topicName, e);
+ CambriaErrorResponse cambriaErrorResponse = processError(e);
+
+ writeErrorToLog(cambriaErrorResponse, methodName, "unregister from topic as " + subscriberTypeEnum
+ .toString()
+ .toLowerCase());
+
+ return cambriaErrorResponse;
+ } finally {
+ if (createTopicManager != null) {
+ createTopicManager.close();
+ }
+ }
+
+ return new CambriaErrorResponse(CambriaOperationStatus.OK, HttpStatus.SC_OK);
+ }
+
+ private AbstractAuthenticatedManagerBuilder<CambriaTopicManager> createTopicManagerBuilder(Collection<String> hostSet, String managerApiKey, String managerSecretKey) {
+ AbstractAuthenticatedManagerBuilder<CambriaTopicManager> clientBuilder = createTopicManagerBuilder(hostSet)
+ .authenticatedBy(managerApiKey, managerSecretKey);
+ if (USE_HTTPS_WITH_DMAAP) {
+ clientBuilder = clientBuilder.usingHttps();
+ }
+
+ return clientBuilder;
+ }
+
+ private AbstractAuthenticatedManagerBuilder<CambriaTopicManager> createTopicManagerBuilder(Collection<String> hostSet) {
+ return new TopicManagerBuilder().usingHosts(hostSet);
+ }
+
+ /**
+ * register a public key (subscriberId) to a given topic as a CONSUMER or PRODUCER
+ *
+ * @param hostSet
+ * @param managerApiKey
+ * @param managerSecretKey
+ * @param subscriberApiKey
+ * @param subscriberTypeEnum
+ * @param topicName
+ * @return
+ */
+ @Override
+ public CambriaErrorResponse registerToTopic(Collection<String> hostSet, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum, String topicName) {
+
+ String methodName = "registerToTopic";
+ CambriaTopicManager createTopicManager = null;
+ try {
+ AbstractAuthenticatedManagerBuilder<CambriaTopicManager> clientBuilder = createTopicManagerBuilder(hostSet, managerApiKey, managerSecretKey);
+ createTopicManager = buildCambriaClient(clientBuilder);
+
+ if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) {
+ createTopicManager.allowProducer(topicName, subscriberApiKey);
+ }
+ else {
+ createTopicManager.allowConsumer(topicName, subscriberApiKey);
+ }
+
+ }
+ catch (HttpObjectNotFoundException e) {
+ log.debug("Failed to register {} to topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString()
+ .toLowerCase(), e);
+
+ BeEcompErrorManager.getInstance().logBeUebObjectNotFoundError(methodName, e.getMessage());
+
+ return new CambriaErrorResponse(CambriaOperationStatus.OBJECT_NOT_FOUND, HttpStatus.SC_NOT_FOUND);
+
+ }
+ catch (HttpException | IOException | GeneralSecurityException e) {
+ log.debug("Failed to register {} to topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString()
+ .toLowerCase(), e);
+ CambriaErrorResponse cambriaErrorResponse = processError(e);
+
+ writeErrorToLog(cambriaErrorResponse, methodName, "register to topic as " + subscriberTypeEnum
+ .toString()
+ .toLowerCase());
+
+ return cambriaErrorResponse;
+ } finally {
+ if (createTopicManager != null) {
+ createTopicManager.close();
+ }
+ }
+
+ return new CambriaErrorResponse(CambriaOperationStatus.OK, HttpStatus.SC_OK);
+ }
+
+ /**
+ * create and retrieve a Cambria Consumer for a specific topic
+ *
+ * @param hostSet
+ * @param topicName
+ * @param apiKey
+ * @param secretKey
+ * @param consumerId
+ * @param consumerGroup
+ * @param timeoutMS
+ * @return
+ * @throws Exception
+ */
+ @Override
+ public CambriaConsumer createConsumer(Collection<String> hostSet, String topicName, String apiKey, String secretKey, String consumerId, String consumerGroup, int timeoutMS) throws Exception {
+
+ CambriaConsumer consumer = new ConsumerBuilder().authenticatedBy(apiKey, secretKey)
+ .knownAs(consumerGroup, consumerId)
+ .onTopic(topicName)
+ .usingHosts(hostSet)
+ .waitAtServer(timeoutMS)
+ .build();
+ consumer.setApiCredentials(apiKey, secretKey);
+ return consumer;
+ }
+
+ public void closeConsumer(CambriaConsumer consumer) {
+
+ if (consumer != null) {
+ consumer.close();
+ }
+
+ }
+
+ /**
+ * use the topicConsumer to fetch messages from topic. in case no messages were fetched, empty ArrayList will be returned (not error)
+ *
+ * @param topicConsumer
+ * @return
+ */
+ @Override
+ public Either<Iterable<String>, CambriaErrorResponse> fetchFromTopic(CambriaConsumer topicConsumer) {
+
+ String methodName = "fetchFromTopic";
+ try {
+ Iterable<String> messages = topicConsumer.fetch();
+ if (messages == null) {
+ messages = new ArrayList<>();
+ }
+ return Either.left(messages);
+
+ }
+ catch (IOException e) {
+ CambriaErrorResponse cambriaErrorResponse = processError(e);
+ log.debug("Failed to fetch from U-EB topic. error={}", e.getMessage());
+ writeErrorToLog(cambriaErrorResponse, methodName, "get messages from topic");
+ return Either.right(cambriaErrorResponse);
+
+ }
+ catch (Exception e) {
+ log.debug("Failed to fetch from U-EB topic", e);
+ BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
+ CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, HttpStatus.SC_INTERNAL_SERVER_ERROR);
+ return Either.right(cambriaErrorResponse);
+ }
+ }
+
+ /**
+ * Publish notification message to a given queue
+ *
+ * @param topicName
+ * @param uebPublicKey
+ * @param uebSecretKey
+ * @param uebServers
+ * @param data
+ * @return
+ */
+ @Override
+ public CambriaErrorResponse sendNotification(String topicName, String uebPublicKey, String uebSecretKey, List<String> uebServers, INotificationData data) {
+
+ CambriaBatchingPublisher createSimplePublisher = null;
+
+ try {
+
+ String json = gson.toJson(data);
+ log.trace("Before sending notification data {} to topic {}", json, topicName);
+
+ createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHosts(uebServers).build();
+ createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey);
+
+ int result = createSimplePublisher.send(PARTITION_KEY, json);
+
+ try {
+ SECONDS.sleep(1L);
+ }
+ catch (InterruptedException e) {
+ log.debug("Failed during sleep after sending the message.", e);
+ Thread.currentThread().interrupt();
+ }
+
+ log.debug("After sending notification data to topic {}. result is {}", topicName, result);
+
+ return new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
+
+ } catch (IOException | GeneralSecurityException e) {
+ log.debug("Failed to send notification {} to topic {} ", data, topicName, e);
+
+ CambriaErrorResponse cambriaErrorResponse = processError(e);
+
+ writeErrorToLog(cambriaErrorResponse, "sendNotification", SEND_NOTIFICATION);
+
+ return cambriaErrorResponse;
+ }
+ finally {
+ if (createSimplePublisher != null) {
+ log.debug("Before closing publisher");
+ createSimplePublisher.close();
+ log.debug("After closing publisher");
+ }
+ }
+ }
+ @Override
+ public CambriaErrorResponse sendNotificationAndClose(String topicName, String uebPublicKey, String uebSecretKey, List<String> uebServers, INotificationData data, long waitBeforeCloseTimeout) {
+ String methodName = "sendNotificationAndClose";
+ CambriaBatchingPublisher createSimplePublisher;
+ CambriaErrorResponse response;
+ try {
+
+ String json = gson.toJson(data);
+ log.debug("Before sending notification data {} to topic {}", json, topicName);
+
+ createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHosts(uebServers).build();
+ createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey);
+
+ int result = createSimplePublisher.send(PARTITION_KEY, json);
+
+ try {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e) {
+ log.debug("Failed during sleep after sending the message.", e);
+ Thread.currentThread().interrupt();
+ }
+
+ log.debug("After sending notification data to topic {}. result is {}", topicName, result);
+
+ }
+ catch (IOException | GeneralSecurityException e) {
+ log.debug("Failed to send notification {} to topic {} ", data, topicName, e);
+
+
+ response = processError(e);
+
+ writeErrorToLog(response, methodName, SEND_NOTIFICATION);
+
+ return response;
+
+ }
+
+ log.debug("Before closing publisher. Maximum timeout is {} seconds", waitBeforeCloseTimeout);
+ try {
+ List<message> messagesInQ = createSimplePublisher.close(waitBeforeCloseTimeout, SECONDS);
+ if (messagesInQ != null && !messagesInQ.isEmpty()) {
+ log.debug("Cambria client returned {} non sent messages.", messagesInQ.size());
+ response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
+ writeErrorToLog(response, methodName, SEND_NOTIFICATION);
+ }
+ else {
+ log.debug("No message left in the queue after closing cambria publisher");
+ response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
+ }
+ }
+ catch (InterruptedException e) {
+ log.debug("InterruptedException while closing cambria publisher", e);
+ Thread.currentThread().interrupt();
+ response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
+ writeErrorToLog(response, methodName, SEND_NOTIFICATION);
+ }
+ catch (IOException e) {
+ log.debug("Failed to close cambria publisher", e);
+ response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
+ writeErrorToLog(response, methodName, SEND_NOTIFICATION);
+ }
+ log.debug("After closing publisher");
+
+ return response;
+
+ }
+ @Override
+ public CambriaErrorResponse getApiKey(String server, String apiKey) {
+
+ CambriaErrorResponse response;
+ List<String> hostSet = new ArrayList<>();
+ hostSet.add(server);
+ try {
+ CambriaIdentityManager createIdentityManager = buildCambriaClient(new IdentityManagerBuilder().usingHosts(hostSet));
+ createIdentityManager.getApiKey(apiKey);
+
+ response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
+
+ }
+ catch (HttpException | IOException | CambriaApiException | GeneralSecurityException e) {
+ log.debug("Failed to fetch api key {} from server {}", apiKey, server, e);
+
+ response = processError(e);
+
+ }
+
+ return response;
+ }
+ @Override
+ public Either<ApiCredential, CambriaErrorResponse> createUebKeys(List<String> hostSet) {
+ Either<ApiCredential, CambriaErrorResponse> result;
+
+ try {
+ CambriaIdentityManager createIdentityManager = buildCambriaClient(new IdentityManagerBuilder().usingHosts(hostSet));
+
+ String description = String.format("ASDC Key for %s", CONSUMER_ID);
+ ApiCredential credential = createIdentityManager.createApiKey("", description);
+ createIdentityManager.setApiCredentials(credential.getApiKey(), credential.getApiSecret());
+ result = Either.left(credential);
+
+ }
+ catch (Exception e) {
+ log.debug("Failed to create ueb keys for servers {}", hostSet, e);
+
+ result = Either.right(processError(e));
+ }
+
+ return result;
+ }
+
+ @VisibleForTesting
+ <T extends CambriaClient> T buildCambriaClient(CambriaClientBuilders.AbstractAuthenticatedManagerBuilder<T> client) throws MalformedURLException, GeneralSecurityException {
+ return client.build();
+ }
}