2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdc.be.components.distribution.engine;
23 import java.io.IOException;
24 import java.net.MalformedURLException;
25 import java.security.GeneralSecurityException;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.Iterator;
29 import java.util.List;
31 import java.util.concurrent.TimeUnit;
32 import java.util.regex.Matcher;
33 import java.util.regex.Pattern;
35 import com.att.nsa.cambria.client.*;
36 import org.apache.http.HttpStatus;
37 import org.openecomp.sdc.be.config.BeEcompErrorManager;
38 import org.openecomp.sdc.be.config.ConfigurationManager;
39 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
40 import org.openecomp.sdc.common.config.EcompErrorName;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
44 import com.att.nsa.apiClient.http.HttpException;
45 import com.att.nsa.apiClient.http.HttpObjectNotFoundException;
46 import com.att.nsa.cambria.client.CambriaClient.CambriaApiException;
47 import com.att.nsa.cambria.client.CambriaClientBuilders.TopicManagerBuilder;
48 import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
49 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
50 import com.att.nsa.cambria.client.CambriaClientBuilders.IdentityManagerBuilder;
51 import com.att.nsa.cambria.client.CambriaPublisher.message;
52 import com.google.gson.Gson;
54 import fj.data.Either;
55 import jline.internal.Log;
57 public class CambriaHandler {
59 private static Logger logger = LoggerFactory.getLogger(CambriaHandler.class.getName());
61 public static String PARTITION_KEY = "asdc" + "aa";
63 private Gson gson = new Gson();
65 public static boolean useHttpsWithDmaap = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration().isUseHttpsWithDmaap();
67 public static void main(String[] args) {
69 // String userBodyJson ="{\"artifactName\":\"myartifact\",
70 // \"artifactType\":\"MURANO-PKG\",
71 // \"artifactDescription\":\"description\",
72 // \"payloadData\":\"UEsDBAoAAAAIAAeLb0bDQz\", \"Content-MD5\":
73 // \"YTg2Mjg4MWJhNmI5NzBiNzdDFkMWI=\" }";
74 // System.out.println(userBodyJson);
75 // String encodeBase64Str = GeneralUtililty.calculateMD5 (userBodyJson);
76 // System.out.println(encodeBase64Str);
78 CambriaTopicManager createTopicManager = null;
80 List<String> servers = new ArrayList<String>();
81 // servers.add("uebsb91kcdc.it.sdc.com:3904");
82 // servers.add("uebsb92kcdc.it.sdc.com:3904");
83 // servers.add("uebsb93kcdc.it.sdc.com:3904");
84 servers.add("uebsb91sfdc.it.att.com:3904");
85 servers.add("uebsb92sfdc.it.att.com:3904");
87 String key = "sSJc5qiBnKy2qrlc";
88 String secret = "4ZRPzNJfEUK0sSNBvccd2m7X";
90 createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(servers).authenticatedBy(key, secret));
92 String topicName = "ASDC-DISTR-NOTIF-TOPIC-PRODesofer";
94 String clientKey1 = "CGGoorrGPXPx2B1C";
95 String clientSecret1 = "OTHk2mcCSbskEtHhDw8h5oUa";
97 CambriaTopicManager createStatusTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(servers).authenticatedBy(key, secret));
98 String reportTopic = "ASDC-DISTR-STATUS-TOPIC-PRODESOFER";
99 createStatusTopicManager.allowProducer(reportTopic, clientKey1);
101 CambriaBatchingPublisher createSimplePublisher = new PublisherBuilder().onTopic(reportTopic).usingHttps(useHttpsWithDmaap).usingHosts(servers).build();
102 createSimplePublisher.setApiCredentials(clientKey1, clientSecret1);
104 DistributionStatusNotification distributionStatusNotification = new DistributionStatusNotification();
105 distributionStatusNotification.setStatus(DistributionStatusNotificationEnum.DEPLOY_OK);
106 distributionStatusNotification.setArtifactURL("Ssssssss url");
107 distributionStatusNotification.setDistributionID("idddddddddddddd");
108 distributionStatusNotification.setTimestamp(System.currentTimeMillis());
109 distributionStatusNotification.setConsumerID("my consumer id");
111 Gson gson = new Gson();
112 int result = createSimplePublisher.send(PARTITION_KEY, gson.toJson(distributionStatusNotification));
114 List<message> messagesInQ = createSimplePublisher.close(20, TimeUnit.SECONDS);
115 System.out.println(messagesInQ == null ? 0 : messagesInQ.size());
117 // createTopicManager.createTopic(topicName, "my test topic", 1, 1);
121 * { "secret": "OTHk2mcCSbskEtHhDw8h5oUa", "aux": { "email": "esofer@intl.sdc.com", "description": "test-keys" }, "key": "CGGoorrGPXPx2B1C" }
124 * { "secret": "FSlNJbmGWWBvBLJetQMYxPP6", "aux": { "email": "esofer@intl.sdc.com", "description": "test-keys" }, "key": "TAIEPO0aDU4VzM0G" }
128 String clientKey2 = "TAIEPO0aDU4VzM0G";
130 CambriaConsumer createConsumer1 = new ConsumerBuilder().authenticatedBy("asdc1", "consumerId1").onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(servers).build();
131 createConsumer1.setApiCredentials(clientKey1, "OTHk2mcCSbskEtHhDw8h5oUa");
133 createTopicManager.allowConsumer(topicName, clientKey1);
135 CambriaConsumer createConsumer2 = null;
137 createConsumer2 = new ConsumerBuilder().authenticatedBy("asdc2", "consumerId3").onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(servers).build();
138 createConsumer2.setApiCredentials(clientKey2, "FSlNJbmGWWBvBLJetQMYxPP6");
140 createTopicManager.allowConsumer(topicName, clientKey2);
143 createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(servers).build();
144 createSimplePublisher.setApiCredentials(key, secret);
145 createTopicManager.allowProducer(topicName, key);
147 createSimplePublisher.send("aaaa", "{ my testttttttttttttttt }");
151 Iterable<String> fetch1 = createConsumer1.fetch();
153 Iterator<String> iterator1 = fetch1.iterator();
154 while (iterator1.hasNext()) {
155 System.out.println("***********************************************");
156 System.out.println("client 1" + iterator1.next());
157 System.out.println("***********************************************");
160 if (createConsumer2 != null) {
161 Iterable<String> fetch2 = createConsumer2.fetch();
163 Iterator<String> iterator2 = fetch2.iterator();
164 while (iterator2.hasNext()) {
165 System.out.println("***********************************************");
166 System.out.println("client 2" + iterator2.next());
167 System.out.println("***********************************************");
170 Thread.sleep(1000 * 20);
173 // createTopicManager = CambriaClientFactory.createTopicManager(
174 // servers, "8F3MDAtMSBwwpSMy", "gzFmsTxSCtO5RQfAccM6PqqX");
176 // createTopicManager.deleteTopic("ASDC-DISTR-NOTIF-TOPIC-PROD");
177 // createTopicManager.deleteTopic("ASDC-DISTR-NOTIF-TOPIC-PROD1");
179 // CambriaIdentityManager createIdentityManager =
180 // CambriaClientFactory.createIdentityManager(null, null, null);
181 // createIdentityManager.setApiCredentials(arg0, arg1);
182 // createIdentityManager.cl
184 // String topicName = " ";
185 // createTopicManager.createTopic(topicName,
186 // "ASDC distribution notification topic", 1, 1);
188 // Thread.sleep(10 * 1000);
190 // for (int i = 0; i < 5; i++) {
192 // boolean openForProducing = createTopicManager
193 // .isOpenForProducing(topicName);
195 // System.out.println("openForProducing=" + openForProducing);
196 // createTopicManager.allowProducer(topicName,
197 // "8F3MDAtMSBwwpSMy");
198 // Set<String> allowedProducers = createTopicManager
199 // .getAllowedProducers(topicName);
200 // System.out.println(allowedProducers);
202 // } catch (Exception e) {
203 // e.printStackTrace();
207 // createTopicManager.createTopic("", "", 0, 0);
208 // createTopicManager.allowProducer(arg0, arg1);
209 // createTopicManager.getTopics();
210 // createTopicManager.close();
211 // CambriaClientFactory.
212 // CambriaBatchingPublisher createSimplePublisher =
213 // CambriaClientFactory.createSimplePublisher("hostlist", "topic");
215 // CambriaIdentityManager createIdentityManager =
216 // CambriaClientFactory.createIdentityManager(null, "apiKey",
218 // createIdentityManager.
220 } catch (Exception e) {
221 Log.debug("Exception in main test of Cambria Handler: {}", e.getMessage(), e);
224 if (createTopicManager != null) {
225 createTopicManager.close();
231 * process the response error from Cambria client
236 private Integer processMessageException(String message) {
238 String[] patterns = { "(HTTP Status )(\\d\\d\\d)", "(HTTP/\\d.\\d )(\\d\\d\\d)" };
240 Integer result = checkPattern(patterns[0], message, 2);
241 if (result != null) {
244 result = checkPattern(patterns[1], message, 2);
251 * check whether the message has a match with a given pattern inside it
258 private Integer checkPattern(String patternStr, String message, int groupIndex) {
259 Integer result = null;
261 Pattern pattern = Pattern.compile(patternStr);
262 Matcher matcher = pattern.matcher(message);
263 boolean find = matcher.find();
265 String httpCode = matcher.group(groupIndex);
266 if (httpCode != null) {
268 result = Integer.valueOf(httpCode);
269 } catch (NumberFormatException e) {
270 logger.debug("Failed to parse http code {}", httpCode);
278 * retrieve all topics from U-EB server
283 public Either<Set<String>, CambriaErrorResponse> getTopics(List<String> hostSet) {
285 CambriaTopicManager createTopicManager = null;
288 createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet));
290 Set<String> topics = createTopicManager.getTopics();
292 if (topics == null || true == topics.isEmpty()) {
293 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.NOT_FOUND, null);
294 return Either.right(cambriaErrorResponse);
297 return Either.left(topics);
299 } catch (IOException | GeneralSecurityException e) {
300 String methodName = new Object() {
301 }.getClass().getEnclosingMethod().getName();
303 CambriaErrorResponse cambriaErrorResponse = processError(e);
305 logger.debug("Failed to fetch topics from U-EB server", e);
306 writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "get topics");
308 return Either.right(cambriaErrorResponse);
310 if (createTopicManager != null) {
311 createTopicManager.close();
318 * process the error message from Cambria client.
320 * set Cambria status and http code in case we succeed to fetch it
324 private CambriaErrorResponse processError(Exception e) {
326 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse();
328 Integer httpCode = processMessageException(e.getMessage());
330 if (httpCode != null) {
331 cambriaErrorResponse.setHttpCode(httpCode);
332 switch (httpCode.intValue()) {
335 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.AUTHENTICATION_ERROR);
338 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.TOPIC_ALREADY_EXIST);
341 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.INTERNAL_SERVER_ERROR);
344 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.CONNNECTION_ERROR);
348 boolean found = false;
349 Throwable throwable = e.getCause();
350 if (throwable != null) {
351 String message = throwable.getMessage();
353 Throwable cause = throwable.getCause();
356 Class<?> clazz = cause.getClass();
357 String className = clazz.getName();
358 if (className.endsWith("UnknownHostException")) {
359 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.UNKNOWN_HOST_ERROR);
360 cambriaErrorResponse.addVariable(message);
366 if (false == found) {
367 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.CONNNECTION_ERROR);
368 cambriaErrorResponse.setHttpCode(HttpStatus.SC_INTERNAL_SERVER_ERROR);
372 return cambriaErrorResponse;
376 * write the error to the log
378 * @param cambriaErrorResponse
379 * @param errorMessage
381 * @param operationDesc
383 private void writeErrorToLog(CambriaErrorResponse cambriaErrorResponse, String errorMessage, String methodName, String operationDesc) {
385 String httpCode = (cambriaErrorResponse.getHttpCode() == null ? "" : String.valueOf(cambriaErrorResponse.getHttpCode()));
387 switch (cambriaErrorResponse.getOperationStatus()) {
388 case UNKNOWN_HOST_ERROR:
389 String hostname = cambriaErrorResponse.getVariables().get(0);
390 BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebUnkownHostError, methodName, hostname);
391 BeEcompErrorManager.getInstance().logBeUebUnkownHostError(methodName, httpCode);
393 case AUTHENTICATION_ERROR:
394 BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebAuthenticationError, methodName, httpCode);
395 BeEcompErrorManager.getInstance().logBeUebAuthenticationError(methodName, httpCode);
397 case CONNNECTION_ERROR:
398 BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebConnectionError, methodName, httpCode);
399 BeEcompErrorManager.getInstance().logBeUebConnectionError(methodName, httpCode);
402 case INTERNAL_SERVER_ERROR:
403 BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, methodName, operationDesc);
404 BeEcompErrorManager.getInstance().logBeUebSystemError(methodName, operationDesc);
412 * create a topic if it does not exists in the topicsList
415 * - list of U-EB servers
420 * @param partitionCount
421 * @param replicationCount
424 public CambriaErrorResponse createTopic(Collection<String> hostSet, String apiKey, String secretKey, String topicName, int partitionCount, int replicationCount) {
426 CambriaTopicManager createTopicManager = null;
429 createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(apiKey, secretKey));
431 createTopicManager.createTopic(topicName, "ASDC distribution notification topic", partitionCount, replicationCount);
433 } catch (HttpException | IOException | GeneralSecurityException e) {
435 logger.debug("Failed to create topic {}", topicName, e);
436 String methodName = new Object() {
437 }.getClass().getEnclosingMethod().getName();
439 CambriaErrorResponse cambriaErrorResponse = processError(e);
441 if (cambriaErrorResponse.getOperationStatus() != CambriaOperationStatus.TOPIC_ALREADY_EXIST) {
442 writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "create topic");
445 return cambriaErrorResponse;
448 if (createTopicManager != null) {
449 createTopicManager.close();
452 return new CambriaErrorResponse(CambriaOperationStatus.OK);
456 public CambriaErrorResponse unRegisterFromTopic(Collection<String> hostSet, String topicName, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum) {
457 CambriaTopicManager createTopicManager = null;
459 createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey));
461 if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) {
462 createTopicManager.revokeProducer(topicName, subscriberApiKey);
464 createTopicManager.revokeConsumer(topicName, subscriberApiKey);
467 } catch (HttpObjectNotFoundException | GeneralSecurityException e) {
468 logger.debug("Failed to unregister {} from topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString().toLowerCase(), e);
469 String methodName = new Object() {
470 }.getClass().getEnclosingMethod().getName();
472 BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebObjectNotFoundError, methodName, e.getMessage());
473 BeEcompErrorManager.getInstance().logBeUebObjectNotFoundError(methodName, e.getMessage());
475 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OBJECT_NOT_FOUND, HttpStatus.SC_NOT_FOUND);
476 return cambriaErrorResponse;
478 } catch (HttpException | IOException e) {
479 logger.debug("Failed to unregister {} from topic {} as producer", managerApiKey, topicName, e);
480 String methodName = new Object() {
481 }.getClass().getEnclosingMethod().getName();
483 CambriaErrorResponse cambriaErrorResponse = processError(e);
485 writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "unregister from topic as " + subscriberTypeEnum.toString().toLowerCase());
487 return cambriaErrorResponse;
489 if (createTopicManager != null) {
490 createTopicManager.close();
494 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK, HttpStatus.SC_OK);
495 return cambriaErrorResponse;
500 * register a public key (subscriberId) to a given topic as a CONSUMER or PRODUCER
504 * @param managerApiKey
505 * @param managerSecretKey
506 * @param subscriberApiKey
507 * @param subscriberTypeEnum
510 public CambriaErrorResponse registerToTopic(Collection<String> hostSet, String topicName, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum) {
512 CambriaTopicManager createTopicManager = null;
514 createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey));
516 if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) {
517 createTopicManager.allowProducer(topicName, subscriberApiKey);
519 createTopicManager.allowConsumer(topicName, subscriberApiKey);
522 } catch (HttpObjectNotFoundException | GeneralSecurityException e) {
523 logger.debug("Failed to register {} to topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString().toLowerCase(), e);
524 String methodName = new Object() {
525 }.getClass().getEnclosingMethod().getName();
527 BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebObjectNotFoundError, methodName, e.getMessage());
528 BeEcompErrorManager.getInstance().logBeUebObjectNotFoundError(methodName, e.getMessage());
530 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OBJECT_NOT_FOUND, HttpStatus.SC_NOT_FOUND);
531 return cambriaErrorResponse;
533 } catch (HttpException | IOException e) {
534 logger.debug("Failed to register {} to topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString().toLowerCase(), e);
535 String methodName = new Object() {
536 }.getClass().getEnclosingMethod().getName();
538 CambriaErrorResponse cambriaErrorResponse = processError(e);
540 writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "register to topic as " + subscriberTypeEnum.toString().toLowerCase());
542 return cambriaErrorResponse;
544 if (createTopicManager != null) {
545 createTopicManager.close();
549 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK, HttpStatus.SC_OK);
550 return cambriaErrorResponse;
554 * create and retrieve a Cambria Consumer for a specific topic
561 * @param consumerGroup
566 public CambriaConsumer createConsumer(Collection<String> hostSet, String topicName, String apiKey, String secretKey, String consumerId, String consumerGroup, int timeoutMS) throws Exception {
568 CambriaConsumer consumer = new ConsumerBuilder().authenticatedBy(apiKey, secretKey).knownAs(consumerGroup, consumerId).onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(hostSet).withSocketTimeout(timeoutMS).build();
569 consumer.setApiCredentials(apiKey, secretKey);
573 public void closeConsumer(CambriaConsumer consumer) {
575 if (consumer != null) {
582 * use the topicConsumer to fetch messages from topic. in case no messages were fetched, empty ArrayList will be returned (not error)
584 * @param topicConsumer
587 public Either<Iterable<String>, CambriaErrorResponse> fetchFromTopic(CambriaConsumer topicConsumer) {
590 Iterable<String> messages = topicConsumer.fetch();
591 if (messages == null) {
592 messages = new ArrayList<String>();
594 return Either.left(messages);
596 } catch (IOException e) {
597 String methodName = new Object() {
598 }.getClass().getEnclosingMethod().getName();
600 CambriaErrorResponse cambriaErrorResponse = processError(e);
602 logger.debug("Failed to fetch from U-EB topic. error={}", e.getMessage());
603 writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "get messages from topic");
605 return Either.right(cambriaErrorResponse);
607 } catch (Exception e) {
608 logger.debug("Failed to fetch from U-EB topic", e);
609 String methodName = new Object() {
610 }.getClass().getEnclosingMethod().getName();
612 BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeDistributionEngineSystemError, methodName, e.getMessage());
613 BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
615 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, HttpStatus.SC_INTERNAL_SERVER_ERROR);
616 return Either.right(cambriaErrorResponse);
621 * Publish notification message to a given queue
624 * @param uebPublicKey
625 * @param uebSecretKey
630 public CambriaErrorResponse sendNotification(String topicName, String uebPublicKey, String uebSecretKey, List<String> uebServers, INotificationData data) {
632 CambriaBatchingPublisher createSimplePublisher = null;
636 String json = gson.toJson(data);
637 logger.trace("Before sending notification data {} to topic {}", json, topicName);
639 createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(uebServers).build();
640 createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey);
642 int result = createSimplePublisher.send(PARTITION_KEY, json);
645 Thread.sleep(1 * 1000);
646 } catch (InterruptedException e) {
647 logger.debug("Failed during sleep after sending the message.", e);
650 logger.debug("After sending notification data to topic {}. result is {}", topicName, result);
652 CambriaErrorResponse response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
656 } catch (IOException | GeneralSecurityException e) {
657 logger.debug("Failed to send notification {} to topic {} ", data, topicName, e);
659 String methodName = new Object() {
660 }.getClass().getEnclosingMethod().getName();
662 CambriaErrorResponse cambriaErrorResponse = processError(e);
664 writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "send notification");
666 return cambriaErrorResponse;
668 if (createSimplePublisher != null) {
669 logger.debug("Before closing publisher");
670 createSimplePublisher.close();
671 logger.debug("After closing publisher");
676 private String convertListToString(List<String> list) {
677 StringBuilder builder = new StringBuilder();
680 for (int i = 0; i < list.size(); i++) {
681 builder.append(list.get(i));
682 if (i < list.size() - 1) {
688 return builder.toString();
691 public CambriaErrorResponse sendNotificationAndClose(String topicName, String uebPublicKey, String uebSecretKey, List<String> uebServers, INotificationData data, long waitBeforeCloseTimeout) {
693 CambriaBatchingPublisher createSimplePublisher = null;
695 CambriaErrorResponse response = null;
698 String json = gson.toJson(data);
699 logger.debug("Before sending notification data {} to topic {}", json, topicName);
701 createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(uebServers).build();
702 createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey);
704 int result = createSimplePublisher.send(PARTITION_KEY, json);
708 } catch (InterruptedException e) {
709 logger.debug("Failed during sleep after sending the message.", e);
712 logger.debug("After sending notification data to topic {}. result is {}", topicName, result);
714 } catch (IOException | GeneralSecurityException e) {
715 logger.debug("Failed to send notification {} to topic {} ", data, topicName, e);
717 String methodName = new Object() {
718 }.getClass().getEnclosingMethod().getName();
720 response = processError(e);
722 writeErrorToLog(response, e.getMessage(), methodName, "send notification");
728 logger.debug("Before closing publisher. Maximum timeout is {} seconds", waitBeforeCloseTimeout);
730 List<message> messagesInQ = createSimplePublisher.close(waitBeforeCloseTimeout, TimeUnit.SECONDS);
731 if (messagesInQ != null && false == messagesInQ.isEmpty()) {
732 logger.debug("Cambria client returned {} non sent messages.", messagesInQ.size());
733 response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
734 String methodName = new Object() {
735 }.getClass().getEnclosingMethod().getName();
736 writeErrorToLog(response, "closing publisher returned non sent messages", methodName, "send notification");
738 logger.debug("No message left in the queue after closing cambria publisher");
739 response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
741 } catch (IOException | InterruptedException e) {
742 logger.debug("Failed to close cambria publisher", e);
743 response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
744 String methodName = new Object() {
745 }.getClass().getEnclosingMethod().getName();
746 writeErrorToLog(response, "closing publisher returned non sent messages", methodName, "send notification");
748 logger.debug("After closing publisher");
754 public CambriaErrorResponse getApiKey(String server, String apiKey) {
756 CambriaErrorResponse response = null;
758 List<String> hostSet = new ArrayList<>();
760 CambriaIdentityManager createIdentityManager = null;
762 createIdentityManager = buildCambriaClient(new IdentityManagerBuilder().usingHosts(hostSet));
763 createIdentityManager.getApiKey(apiKey);
764 response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
766 } catch (HttpException | IOException | CambriaApiException | GeneralSecurityException e) {
767 logger.debug("Failed to fetch api key {} from server ", apiKey, server, e);
769 response = processError(e);
776 private static <T extends CambriaClient> T buildCambriaClient(CambriaClientBuilders.AbstractAuthenticatedManagerBuilder<? extends CambriaClient> client) throws MalformedURLException, GeneralSecurityException {
777 if (useHttpsWithDmaap) {
780 return (T)client.build();