2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdc.be.components.distribution.engine;
23 import com.att.nsa.apiClient.credentials.ApiCredential;
24 import com.att.nsa.apiClient.http.HttpException;
25 import com.att.nsa.apiClient.http.HttpObjectNotFoundException;
26 import com.att.nsa.cambria.client.*;
27 import com.att.nsa.cambria.client.CambriaClient.CambriaApiException;
28 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
29 import com.att.nsa.cambria.client.CambriaClientBuilders.IdentityManagerBuilder;
30 import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
31 import com.att.nsa.cambria.client.CambriaClientBuilders.TopicManagerBuilder;
32 import com.att.nsa.cambria.client.CambriaPublisher.message;
33 import com.google.common.annotations.VisibleForTesting;
34 import com.google.gson.Gson;
35 import fj.data.Either;
36 import org.apache.http.HttpStatus;
37 import org.openecomp.sdc.be.config.BeEcompErrorManager;
38 import org.openecomp.sdc.be.config.ConfigurationManager;
39 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
40 import org.openecomp.sdc.common.log.wrappers.Logger;
41 import org.springframework.stereotype.Component;
43 import java.io.IOException;
44 import java.net.MalformedURLException;
45 import java.security.GeneralSecurityException;
46 import java.util.ArrayList;
47 import java.util.Collection;
48 import java.util.List;
50 import java.util.regex.Matcher;
51 import java.util.regex.Pattern;
53 import static java.util.concurrent.TimeUnit.SECONDS;
55 @Component("cambriaHandler")
56 public class CambriaHandler {
58 private static final Logger log = Logger.getLogger(CambriaHandler.class.getName());
59 private static final String PARTITION_KEY = "asdc" + "aa";
60 private static final String SEND_NOTIFICATION = "send notification";
61 private static final String CONSUMER_ID = ConfigurationManager.getConfigurationManager()
62 .getDistributionEngineConfiguration()
63 .getDistributionStatusTopic()
65 private final Gson gson = new Gson();
69 * process the response error from Cambria client
74 private Integer processMessageException(String message) {
76 String[] patterns = {"(HTTP Status )(\\d\\d\\d)", "(HTTP/\\d.\\d )(\\d\\d\\d)"};
78 Integer result = checkPattern(patterns[0], message, 2);
82 result = checkPattern(patterns[1], message, 2);
89 * check whether the message has a match with a given pattern inside it
96 private Integer checkPattern(String patternStr, String message, int groupIndex) {
97 Integer result = null;
99 Pattern pattern = Pattern.compile(patternStr);
100 Matcher matcher = pattern.matcher(message);
101 boolean find = matcher.find();
103 String httpCode = matcher.group(groupIndex);
104 if (httpCode != null) {
106 result = Integer.valueOf(httpCode);
108 catch (NumberFormatException e) {
109 log.debug("Failed to parse http code {}", httpCode);
117 * retrieve all topics from U-EB server
122 public Either<Set<String>, CambriaErrorResponse> getTopics(List<String> hostSet) {
124 CambriaTopicManager createTopicManager = null;
127 createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet));
129 Set<String> topics = createTopicManager.getTopics();
131 if (topics == null || topics.isEmpty()) {
132 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.NOT_FOUND, null);
133 return Either.right(cambriaErrorResponse);
136 return Either.left(topics);
139 catch (IOException | GeneralSecurityException e) {
141 CambriaErrorResponse cambriaErrorResponse = processError(e);
143 log.debug("Failed to fetch topics from U-EB server", e);
144 writeErrorToLog(cambriaErrorResponse, "getTopics", "get topics");
146 return Either.right(cambriaErrorResponse);
148 if (createTopicManager != null) {
149 createTopicManager.close();
156 * process the error message from Cambria client.
158 * set Cambria status and http code in case we succeed to fetch it
162 private CambriaErrorResponse processError(Exception e) {
164 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse();
166 Integer httpCode = processMessageException(e.getMessage());
168 if (httpCode != null) {
169 cambriaErrorResponse.setHttpCode(httpCode);
170 switch (httpCode.intValue()) {
173 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.AUTHENTICATION_ERROR);
176 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.TOPIC_ALREADY_EXIST);
179 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.INTERNAL_SERVER_ERROR);
182 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.CONNNECTION_ERROR);
187 boolean found = false;
188 Throwable throwable = e.getCause();
189 if (throwable != null) {
190 String message = throwable.getMessage();
192 Throwable cause = throwable.getCause();
195 Class<?> clazz = cause.getClass();
196 String className = clazz.getName();
197 if (className.endsWith("UnknownHostException")) {
198 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.UNKNOWN_HOST_ERROR);
199 cambriaErrorResponse.addVariable(message);
206 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.CONNNECTION_ERROR);
207 cambriaErrorResponse.setHttpCode(HttpStatus.SC_INTERNAL_SERVER_ERROR);
211 return cambriaErrorResponse;
215 * write the error to the log
216 * @param cambriaErrorResponse
218 * @param operationDesc
220 private void writeErrorToLog(CambriaErrorResponse cambriaErrorResponse, String methodName, String operationDesc) {
222 String httpCode = cambriaErrorResponse.getHttpCode() == null ? "" : String.valueOf(cambriaErrorResponse.getHttpCode());
224 switch (cambriaErrorResponse.getOperationStatus()) {
225 case UNKNOWN_HOST_ERROR:
226 BeEcompErrorManager.getInstance().logBeUebUnkownHostError(methodName, httpCode);
228 case AUTHENTICATION_ERROR:
229 BeEcompErrorManager.getInstance().logBeUebAuthenticationError(methodName, httpCode);
231 case CONNNECTION_ERROR:
232 BeEcompErrorManager.getInstance().logBeUebConnectionError(methodName, httpCode);
234 case INTERNAL_SERVER_ERROR:
235 BeEcompErrorManager.getInstance().logBeUebSystemError(methodName, operationDesc);
244 * create a topic if it does not exists in the topicsList
246 * @param hostSet - list of U-EB servers
249 * @param topicName - topic to create
250 * @param partitionCount
251 * @param replicationCount
254 public CambriaErrorResponse createTopic(Collection<String> hostSet, String apiKey, String secretKey, String topicName, int partitionCount, int replicationCount) {
256 CambriaTopicManager createTopicManager = null;
259 createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet)
260 .authenticatedBy(apiKey, secretKey));
262 createTopicManager.createTopic(topicName, "ASDC distribution notification topic", partitionCount, replicationCount);
265 catch (HttpException | IOException | GeneralSecurityException e) {
267 log.debug("Failed to create topic {}", topicName, e);
269 CambriaErrorResponse cambriaErrorResponse = processError(e);
271 if (cambriaErrorResponse.getOperationStatus() != CambriaOperationStatus.TOPIC_ALREADY_EXIST) {
272 writeErrorToLog(cambriaErrorResponse, "createTopic", "create topic");
275 return cambriaErrorResponse;
278 if (createTopicManager != null) {
279 createTopicManager.close();
282 return new CambriaErrorResponse(CambriaOperationStatus.OK);
286 public CambriaErrorResponse unRegisterFromTopic(Collection<String> hostSet, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum, String topicName) {
287 String methodName = "unRegisterFromTopic";
288 CambriaTopicManager createTopicManager = null;
290 createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet)
291 .authenticatedBy(managerApiKey, managerSecretKey));
293 if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) {
294 createTopicManager.revokeProducer(topicName, subscriberApiKey);
297 createTopicManager.revokeConsumer(topicName, subscriberApiKey);
301 catch (HttpObjectNotFoundException e) {
302 log.debug("Failed to unregister {} from topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString()
304 BeEcompErrorManager.getInstance().logBeUebObjectNotFoundError(methodName, e.getMessage());
306 return new CambriaErrorResponse(CambriaOperationStatus.OBJECT_NOT_FOUND, HttpStatus.SC_NOT_FOUND);
309 catch (HttpException | IOException | GeneralSecurityException e) {
310 log.debug("Failed to unregister {} from topic {} as producer", managerApiKey, topicName, e);
311 CambriaErrorResponse cambriaErrorResponse = processError(e);
313 writeErrorToLog(cambriaErrorResponse, methodName, "unregister from topic as " + subscriberTypeEnum
317 return cambriaErrorResponse;
319 if (createTopicManager != null) {
320 createTopicManager.close();
324 return new CambriaErrorResponse(CambriaOperationStatus.OK, HttpStatus.SC_OK);
328 * register a public key (subscriberId) to a given topic as a CONSUMER or PRODUCER
331 * @param managerApiKey
332 * @param managerSecretKey
333 * @param subscriberApiKey
334 * @param subscriberTypeEnum
338 public CambriaErrorResponse registerToTopic(Collection<String> hostSet, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum, String topicName) {
340 String methodName = "registerToTopic";
341 CambriaTopicManager createTopicManager = null;
343 createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet)
344 .authenticatedBy(managerApiKey, managerSecretKey));
346 if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) {
347 createTopicManager.allowProducer(topicName, subscriberApiKey);
350 createTopicManager.allowConsumer(topicName, subscriberApiKey);
354 catch (HttpObjectNotFoundException e) {
355 log.debug("Failed to register {} to topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString()
358 BeEcompErrorManager.getInstance().logBeUebObjectNotFoundError(methodName, e.getMessage());
360 return new CambriaErrorResponse(CambriaOperationStatus.OBJECT_NOT_FOUND, HttpStatus.SC_NOT_FOUND);
363 catch (HttpException | IOException | GeneralSecurityException e) {
364 log.debug("Failed to register {} to topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString()
366 CambriaErrorResponse cambriaErrorResponse = processError(e);
368 writeErrorToLog(cambriaErrorResponse, methodName, "register to topic as " + subscriberTypeEnum
372 return cambriaErrorResponse;
374 if (createTopicManager != null) {
375 createTopicManager.close();
379 return new CambriaErrorResponse(CambriaOperationStatus.OK, HttpStatus.SC_OK);
383 * create and retrieve a Cambria Consumer for a specific topic
390 * @param consumerGroup
395 public CambriaConsumer createConsumer(Collection<String> hostSet, String topicName, String apiKey, String secretKey, String consumerId, String consumerGroup, int timeoutMS) throws Exception {
397 CambriaConsumer consumer = new ConsumerBuilder().authenticatedBy(apiKey, secretKey)
398 .knownAs(consumerGroup, consumerId)
401 .waitAtServer(timeoutMS)
403 consumer.setApiCredentials(apiKey, secretKey);
407 public void closeConsumer(CambriaConsumer consumer) {
409 if (consumer != null) {
416 * use the topicConsumer to fetch messages from topic. in case no messages were fetched, empty ArrayList will be returned (not error)
418 * @param topicConsumer
421 public Either<Iterable<String>, CambriaErrorResponse> fetchFromTopic(CambriaConsumer topicConsumer) {
423 String methodName = "fetchFromTopic";
425 Iterable<String> messages = topicConsumer.fetch();
426 if (messages == null) {
427 messages = new ArrayList<>();
429 return Either.left(messages);
432 catch (IOException e) {
433 CambriaErrorResponse cambriaErrorResponse = processError(e);
434 log.debug("Failed to fetch from U-EB topic. error={}", e.getMessage());
435 writeErrorToLog(cambriaErrorResponse, methodName, "get messages from topic");
436 return Either.right(cambriaErrorResponse);
439 catch (Exception e) {
440 log.debug("Failed to fetch from U-EB topic", e);
441 BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
442 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, HttpStatus.SC_INTERNAL_SERVER_ERROR);
443 return Either.right(cambriaErrorResponse);
448 * Publish notification message to a given queue
451 * @param uebPublicKey
452 * @param uebSecretKey
457 public CambriaErrorResponse sendNotification(String topicName, String uebPublicKey, String uebSecretKey, List<String> uebServers, INotificationData data) {
459 CambriaBatchingPublisher createSimplePublisher = null;
463 String json = gson.toJson(data);
464 log.trace("Before sending notification data {} to topic {}", json, topicName);
466 createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHosts(uebServers).build();
467 createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey);
469 int result = createSimplePublisher.send(PARTITION_KEY, json);
474 catch (InterruptedException e) {
475 log.debug("Failed during sleep after sending the message.", e);
478 log.debug("After sending notification data to topic {}. result is {}", topicName, result);
480 return new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
482 } catch (IOException | GeneralSecurityException e) {
483 log.debug("Failed to send notification {} to topic {} ", data, topicName, e);
485 CambriaErrorResponse cambriaErrorResponse = processError(e);
487 writeErrorToLog(cambriaErrorResponse, "sendNotification", SEND_NOTIFICATION);
489 return cambriaErrorResponse;
492 if (createSimplePublisher != null) {
493 log.debug("Before closing publisher");
494 createSimplePublisher.close();
495 log.debug("After closing publisher");
500 public CambriaErrorResponse sendNotificationAndClose(String topicName, String uebPublicKey, String uebSecretKey, List<String> uebServers, INotificationData data, long waitBeforeCloseTimeout) {
501 String methodName = "sendNotificationAndClose";
502 CambriaBatchingPublisher createSimplePublisher;
503 CambriaErrorResponse response;
506 String json = gson.toJson(data);
507 log.debug("Before sending notification data {} to topic {}", json, topicName);
509 createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHosts(uebServers).build();
510 createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey);
512 int result = createSimplePublisher.send(PARTITION_KEY, json);
517 catch (InterruptedException e) {
518 log.debug("Failed during sleep after sending the message.", e);
521 log.debug("After sending notification data to topic {}. result is {}", topicName, result);
524 catch (IOException | GeneralSecurityException e) {
525 log.debug("Failed to send notification {} to topic {} ", data, topicName, e);
528 response = processError(e);
530 writeErrorToLog(response, methodName, SEND_NOTIFICATION);
536 log.debug("Before closing publisher. Maximum timeout is {} seconds", waitBeforeCloseTimeout);
538 List<message> messagesInQ = createSimplePublisher.close(waitBeforeCloseTimeout, SECONDS);
539 if (messagesInQ != null && !messagesInQ.isEmpty()) {
540 log.debug("Cambria client returned {} non sent messages.", messagesInQ.size());
541 response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
542 writeErrorToLog(response, methodName, SEND_NOTIFICATION);
545 log.debug("No message left in the queue after closing cambria publisher");
546 response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
549 catch (IOException | InterruptedException e) {
550 log.debug("Failed to close cambria publisher", e);
551 response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
552 writeErrorToLog(response, methodName, SEND_NOTIFICATION);
554 log.debug("After closing publisher");
560 public CambriaErrorResponse getApiKey(String server, String apiKey) {
562 CambriaErrorResponse response;
563 List<String> hostSet = new ArrayList<>();
566 CambriaIdentityManager createIdentityManager = buildCambriaClient(new IdentityManagerBuilder().usingHosts(hostSet));
567 createIdentityManager.getApiKey(apiKey);
569 response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
572 catch (HttpException | IOException | CambriaApiException | GeneralSecurityException e) {
573 log.debug("Failed to fetch api key {} from server {}", apiKey, server, e);
575 response = processError(e);
582 public Either<ApiCredential, CambriaErrorResponse> createUebKeys(List<String> hostSet) {
583 Either<ApiCredential, CambriaErrorResponse> result;
586 CambriaIdentityManager createIdentityManager = buildCambriaClient(new IdentityManagerBuilder().usingHosts(hostSet));
588 String description = String.format("ASDC Key for %s", CONSUMER_ID);
589 ApiCredential credential = createIdentityManager.createApiKey("", description);
590 createIdentityManager.setApiCredentials(credential.getApiKey(), credential.getApiSecret());
591 result = Either.left(credential);
594 catch (Exception e) {
595 log.debug("Failed to create ueb keys for servers {}", hostSet, e);
597 result = Either.right(processError(e));
605 <T extends CambriaClient> T buildCambriaClient(CambriaClientBuilders.AbstractAuthenticatedManagerBuilder<? extends CambriaClient> client) throws MalformedURLException, GeneralSecurityException {
606 return (T) client.build();