2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdc.be.components.distribution.engine;
23 import com.att.nsa.apiClient.credentials.ApiCredential;
24 import com.att.nsa.apiClient.http.HttpException;
25 import com.att.nsa.apiClient.http.HttpObjectNotFoundException;
26 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
27 import com.att.nsa.cambria.client.CambriaClient;
28 import com.att.nsa.cambria.client.CambriaClient.CambriaApiException;
29 import com.att.nsa.cambria.client.CambriaClientBuilders;
30 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
31 import com.att.nsa.cambria.client.CambriaClientBuilders.IdentityManagerBuilder;
32 import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
33 import com.att.nsa.cambria.client.CambriaClientBuilders.TopicManagerBuilder;
34 import com.att.nsa.cambria.client.CambriaConsumer;
35 import com.att.nsa.cambria.client.CambriaIdentityManager;
36 import com.att.nsa.cambria.client.CambriaPublisher.message;
37 import com.att.nsa.cambria.client.CambriaTopicManager;
38 import com.google.common.annotations.VisibleForTesting;
39 import com.google.gson.Gson;
40 import fj.data.Either;
41 import org.apache.http.HttpStatus;
42 import org.openecomp.sdc.be.config.BeEcompErrorManager;
43 import org.openecomp.sdc.be.config.ConfigurationManager;
44 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47 import org.springframework.stereotype.Component;
49 import java.io.IOException;
50 import java.net.MalformedURLException;
51 import java.security.GeneralSecurityException;
52 import java.util.ArrayList;
53 import java.util.Collection;
54 import java.util.List;
56 import java.util.concurrent.TimeUnit;
57 import java.util.regex.Matcher;
58 import java.util.regex.Pattern;
60 @Component("cambriaHandler")
61 public class CambriaHandler {
63 private static final Logger logger = LoggerFactory.getLogger(CambriaHandler.class);
65 private static final String PARTITION_KEY = "asdc" + "aa";
67 private final String SEND_NOTIFICATION = "send notification";
69 private Gson gson = new Gson();
71 private static final String CONSUMER_ID = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration().getDistributionStatusTopic().getConsumerId();
76 * process the response error from Cambria client
81 private Integer processMessageException(String message) {
83 String[] patterns = { "(HTTP Status )(\\d\\d\\d)", "(HTTP/\\d.\\d )(\\d\\d\\d)" };
85 Integer result = checkPattern(patterns[0], message, 2);
89 result = checkPattern(patterns[1], message, 2);
96 * check whether the message has a match with a given pattern inside it
103 private Integer checkPattern(String patternStr, String message, int groupIndex) {
104 Integer result = null;
106 Pattern pattern = Pattern.compile(patternStr);
107 Matcher matcher = pattern.matcher(message);
108 boolean find = matcher.find();
110 String httpCode = matcher.group(groupIndex);
111 if (httpCode != null) {
113 result = Integer.valueOf(httpCode);
114 } catch (NumberFormatException e) {
115 logger.debug("Failed to parse http code {}", httpCode);
123 * retrieve all topics from U-EB server
128 public Either<Set<String>, CambriaErrorResponse> getTopics(List<String> hostSet) {
130 CambriaTopicManager createTopicManager = null;
133 createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet));
135 Set<String> topics = createTopicManager.getTopics();
137 if (topics == null || true == topics.isEmpty()) {
138 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.NOT_FOUND, null);
139 return Either.right(cambriaErrorResponse);
142 return Either.left(topics);
144 } catch (IOException | GeneralSecurityException e) {
145 String methodName = new Object() {
146 }.getClass().getEnclosingMethod().getName();
148 CambriaErrorResponse cambriaErrorResponse = processError(e);
150 logger.debug("Failed to fetch topics from U-EB server", e);
151 writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "get topics");
153 return Either.right(cambriaErrorResponse);
155 if (createTopicManager != null) {
156 createTopicManager.close();
163 * process the error message from Cambria client.
165 * set Cambria status and http code in case we succeed to fetch it
169 private CambriaErrorResponse processError(Exception e) {
171 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse();
173 Integer httpCode = processMessageException(e.getMessage());
175 if (httpCode != null) {
176 cambriaErrorResponse.setHttpCode(httpCode);
177 switch (httpCode.intValue()) {
180 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.AUTHENTICATION_ERROR);
183 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.TOPIC_ALREADY_EXIST);
186 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.INTERNAL_SERVER_ERROR);
189 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.CONNNECTION_ERROR);
193 boolean found = false;
194 Throwable throwable = e.getCause();
195 if (throwable != null) {
196 String message = throwable.getMessage();
198 Throwable cause = throwable.getCause();
201 Class<?> clazz = cause.getClass();
202 String className = clazz.getName();
203 if (className.endsWith("UnknownHostException")) {
204 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.UNKNOWN_HOST_ERROR);
205 cambriaErrorResponse.addVariable(message);
211 if (false == found) {
212 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.CONNNECTION_ERROR);
213 cambriaErrorResponse.setHttpCode(HttpStatus.SC_INTERNAL_SERVER_ERROR);
217 return cambriaErrorResponse;
221 * write the error to the log
223 * @param cambriaErrorResponse
224 * @param errorMessage
226 * @param operationDesc
228 private void writeErrorToLog(CambriaErrorResponse cambriaErrorResponse, String errorMessage, String methodName, String operationDesc) {
230 String httpCode = cambriaErrorResponse.getHttpCode() == null ? "" : String.valueOf(cambriaErrorResponse.getHttpCode());
232 switch (cambriaErrorResponse.getOperationStatus()) {
233 case UNKNOWN_HOST_ERROR:
234 String hostname = cambriaErrorResponse.getVariables().get(0);
235 BeEcompErrorManager.getInstance().logBeUebUnkownHostError(methodName, httpCode);
237 case AUTHENTICATION_ERROR:
238 BeEcompErrorManager.getInstance().logBeUebAuthenticationError(methodName, httpCode);
240 case CONNNECTION_ERROR:
241 BeEcompErrorManager.getInstance().logBeUebConnectionError(methodName, httpCode);
244 case INTERNAL_SERVER_ERROR:
245 BeEcompErrorManager.getInstance().logBeUebSystemError(methodName, operationDesc);
254 * create a topic if it does not exists in the topicsList
257 * - list of U-EB servers
262 * @param partitionCount
263 * @param replicationCount
266 public CambriaErrorResponse createTopic(Collection<String> hostSet, String apiKey, String secretKey, String topicName, int partitionCount, int replicationCount) {
268 CambriaTopicManager createTopicManager = null;
271 createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(apiKey, secretKey));
273 createTopicManager.createTopic(topicName, "ASDC distribution notification topic", partitionCount, replicationCount);
275 } catch (HttpException | IOException | GeneralSecurityException e) {
277 logger.debug("Failed to create topic {}", topicName, e);
278 String methodName = new Object() {
279 }.getClass().getEnclosingMethod().getName();
281 CambriaErrorResponse cambriaErrorResponse = processError(e);
283 if (cambriaErrorResponse.getOperationStatus() != CambriaOperationStatus.TOPIC_ALREADY_EXIST) {
284 writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "create topic");
287 return cambriaErrorResponse;
290 if (createTopicManager != null) {
291 createTopicManager.close();
294 return new CambriaErrorResponse(CambriaOperationStatus.OK);
298 public CambriaErrorResponse unRegisterFromTopic(Collection<String> hostSet, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum, String topicName) {
299 CambriaTopicManager createTopicManager = null;
301 createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey));
303 if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) {
304 createTopicManager.revokeProducer(topicName, subscriberApiKey);
306 createTopicManager.revokeConsumer(topicName, subscriberApiKey);
309 } catch (HttpObjectNotFoundException | GeneralSecurityException e) {
310 logger.debug("Failed to unregister {} from topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString().toLowerCase(), e);
311 String methodName = new Object() {
312 }.getClass().getEnclosingMethod().getName();
314 BeEcompErrorManager.getInstance().logBeUebObjectNotFoundError(methodName, e.getMessage());
316 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OBJECT_NOT_FOUND, HttpStatus.SC_NOT_FOUND);
317 return cambriaErrorResponse;
319 } catch (HttpException | IOException e) {
320 logger.debug("Failed to unregister {} from topic {} as producer", managerApiKey, topicName, e);
321 String methodName = new Object() {
322 }.getClass().getEnclosingMethod().getName();
324 CambriaErrorResponse cambriaErrorResponse = processError(e);
326 writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "unregister from topic as " + subscriberTypeEnum.toString().toLowerCase());
328 return cambriaErrorResponse;
330 if (createTopicManager != null) {
331 createTopicManager.close();
335 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK, HttpStatus.SC_OK);
336 return cambriaErrorResponse;
341 * register a public key (subscriberId) to a given topic as a CONSUMER or PRODUCER
344 * @param managerApiKey
345 * @param managerSecretKey
346 * @param subscriberApiKey
347 * @param subscriberTypeEnum
351 public CambriaErrorResponse registerToTopic(Collection<String> hostSet, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum, String topicName) {
353 CambriaTopicManager createTopicManager = null;
355 createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey));
357 if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) {
358 createTopicManager.allowProducer(topicName, subscriberApiKey);
360 createTopicManager.allowConsumer(topicName, subscriberApiKey);
363 } catch (HttpObjectNotFoundException | GeneralSecurityException e) {
364 logger.debug("Failed to register {} to topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString().toLowerCase(), e);
365 String methodName = new Object() {
366 }.getClass().getEnclosingMethod().getName();
368 BeEcompErrorManager.getInstance().logBeUebObjectNotFoundError(methodName, e.getMessage());
370 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OBJECT_NOT_FOUND, HttpStatus.SC_NOT_FOUND);
371 return cambriaErrorResponse;
373 } catch (HttpException | IOException e) {
374 logger.debug("Failed to register {} to topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString().toLowerCase(), e);
375 String methodName = new Object() {
376 }.getClass().getEnclosingMethod().getName();
378 CambriaErrorResponse cambriaErrorResponse = processError(e);
380 writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "register to topic as " + subscriberTypeEnum.toString().toLowerCase());
382 return cambriaErrorResponse;
384 if (createTopicManager != null) {
385 createTopicManager.close();
389 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK, HttpStatus.SC_OK);
390 return cambriaErrorResponse;
394 * create and retrieve a Cambria Consumer for a specific topic
401 * @param consumerGroup
406 public CambriaConsumer createConsumer(Collection<String> hostSet, String topicName, String apiKey, String secretKey, String consumerId, String consumerGroup, int timeoutMS) throws Exception {
408 CambriaConsumer consumer = new ConsumerBuilder().authenticatedBy(apiKey, secretKey).knownAs(consumerGroup, consumerId).onTopic(topicName).usingHosts(hostSet).waitAtServer(timeoutMS).build();
409 consumer.setApiCredentials(apiKey, secretKey);
413 public void closeConsumer(CambriaConsumer consumer) {
415 if (consumer != null) {
422 * use the topicConsumer to fetch messages from topic. in case no messages were fetched, empty ArrayList will be returned (not error)
424 * @param topicConsumer
427 public Either<Iterable<String>, CambriaErrorResponse> fetchFromTopic(CambriaConsumer topicConsumer) {
430 Iterable<String> messages = topicConsumer.fetch();
431 if (messages == null) {
432 messages = new ArrayList<String>();
434 return Either.left(messages);
436 } catch (IOException e) {
437 String methodName = new Object() {
438 }.getClass().getEnclosingMethod().getName();
440 CambriaErrorResponse cambriaErrorResponse = processError(e);
442 logger.debug("Failed to fetch from U-EB topic. error={}", e.getMessage());
443 writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "get messages from topic");
445 return Either.right(cambriaErrorResponse);
447 } catch (Exception e) {
448 logger.debug("Failed to fetch from U-EB topic", e);
449 String methodName = new Object() {
450 }.getClass().getEnclosingMethod().getName();
452 BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
454 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, HttpStatus.SC_INTERNAL_SERVER_ERROR);
455 return Either.right(cambriaErrorResponse);
460 * Publish notification message to a given queue
463 * @param uebPublicKey
464 * @param uebSecretKey
469 public CambriaErrorResponse sendNotification(String topicName, String uebPublicKey, String uebSecretKey, List<String> uebServers, INotificationData data) {
471 CambriaBatchingPublisher createSimplePublisher = null;
475 String json = gson.toJson(data);
476 logger.trace("Before sending notification data {} to topic {}", json, topicName);
478 createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHosts(uebServers).build();
479 createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey);
481 int result = createSimplePublisher.send(PARTITION_KEY, json);
484 Thread.sleep(1 * 1000);
485 } catch (InterruptedException e) {
486 logger.debug("Failed during sleep after sending the message.", e);
489 logger.debug("After sending notification data to topic {}. result is {}", topicName, result);
491 CambriaErrorResponse response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
495 } catch (IOException | GeneralSecurityException e) {
496 logger.debug("Failed to send notification {} to topic {} ", data, topicName, e);
498 String methodName = new Object() {
499 }.getClass().getEnclosingMethod().getName();
501 CambriaErrorResponse cambriaErrorResponse = processError(e);
503 writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, SEND_NOTIFICATION);
505 return cambriaErrorResponse;
507 if (createSimplePublisher != null) {
508 logger.debug("Before closing publisher");
509 createSimplePublisher.close();
510 logger.debug("After closing publisher");
515 public CambriaErrorResponse sendNotificationAndClose(String topicName, String uebPublicKey, String uebSecretKey, List<String> uebServers, INotificationData data, long waitBeforeCloseTimeout) {
517 CambriaBatchingPublisher createSimplePublisher = null;
519 CambriaErrorResponse response = null;
522 String json = gson.toJson(data);
523 logger.debug("Before sending notification data {} to topic {}", json, topicName);
525 createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHosts(uebServers).build();
526 createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey);
528 int result = createSimplePublisher.send(PARTITION_KEY, json);
532 } catch (InterruptedException e) {
533 logger.debug("Failed during sleep after sending the message.", e);
536 logger.debug("After sending notification data to topic {}. result is {}", topicName, result);
538 } catch (IOException | GeneralSecurityException e) {
539 logger.debug("Failed to send notification {} to topic {} ", data, topicName, e);
541 String methodName = new Object() {
542 }.getClass().getEnclosingMethod().getName();
544 response = processError(e);
546 writeErrorToLog(response, e.getMessage(), methodName, SEND_NOTIFICATION);
552 logger.debug("Before closing publisher. Maximum timeout is {} seconds", waitBeforeCloseTimeout);
554 List<message> messagesInQ = createSimplePublisher.close(waitBeforeCloseTimeout, TimeUnit.SECONDS);
555 if (messagesInQ != null && false == messagesInQ.isEmpty()) {
556 logger.debug("Cambria client returned {} non sent messages.", messagesInQ.size());
557 response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
558 String methodName = new Object() {
559 }.getClass().getEnclosingMethod().getName();
560 writeErrorToLog(response, "closing publisher returned non sent messages", methodName, SEND_NOTIFICATION);
562 logger.debug("No message left in the queue after closing cambria publisher");
563 response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
565 } catch (IOException | InterruptedException e) {
566 logger.debug("Failed to close cambria publisher", e);
567 response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
568 String methodName = new Object() {
569 }.getClass().getEnclosingMethod().getName();
570 writeErrorToLog(response, "closing publisher returned non sent messages", methodName, SEND_NOTIFICATION);
572 logger.debug("After closing publisher");
578 public CambriaErrorResponse getApiKey(String server, String apiKey) {
580 CambriaErrorResponse response = null;
582 List<String> hostSet = new ArrayList<>();
585 CambriaIdentityManager createIdentityManager = buildCambriaClient(new IdentityManagerBuilder().usingHosts(hostSet));
586 createIdentityManager.getApiKey(apiKey);
588 response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
590 } catch (HttpException | IOException | CambriaApiException | GeneralSecurityException e) {
591 logger.debug("Failed to fetch api key {} from server {}", apiKey, server, e);
593 response = processError(e);
600 public Either<ApiCredential, CambriaErrorResponse> createUebKeys(List<String> hostSet) {
601 Either<ApiCredential, CambriaErrorResponse> result;
604 CambriaIdentityManager createIdentityManager = buildCambriaClient(new IdentityManagerBuilder().usingHosts(hostSet));
606 String description = String.format("ASDC Key for %s", CONSUMER_ID);
607 ApiCredential credential = createIdentityManager.createApiKey("", description);
608 createIdentityManager.setApiCredentials(credential.getApiKey(), credential.getApiSecret());
609 result = Either.left(credential);
611 } catch (Exception e) {
612 logger.debug("Failed to create ueb keys for servers {}",hostSet, e);
614 result = Either.right(processError(e));
622 <T extends CambriaClient> T buildCambriaClient(CambriaClientBuilders.AbstractAuthenticatedManagerBuilder<? extends CambriaClient> client) throws MalformedURLException, GeneralSecurityException {
623 return (T)client.build();