2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdc.be.components.distribution.engine;
23 import com.att.nsa.apiClient.credentials.ApiCredential;
24 import com.att.nsa.apiClient.http.HttpException;
25 import com.att.nsa.apiClient.http.HttpObjectNotFoundException;
26 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
27 import com.att.nsa.cambria.client.CambriaClient;
28 import com.att.nsa.cambria.client.CambriaClient.CambriaApiException;
29 import com.att.nsa.cambria.client.CambriaClientBuilders;
30 import com.att.nsa.cambria.client.CambriaClientBuilders.AbstractAuthenticatedManagerBuilder;
31 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
32 import com.att.nsa.cambria.client.CambriaClientBuilders.IdentityManagerBuilder;
33 import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
34 import com.att.nsa.cambria.client.CambriaClientBuilders.TopicManagerBuilder;
35 import com.att.nsa.cambria.client.CambriaConsumer;
36 import com.att.nsa.cambria.client.CambriaIdentityManager;
37 import com.att.nsa.cambria.client.CambriaPublisher.message;
38 import com.att.nsa.cambria.client.CambriaTopicManager;
39 import com.google.common.annotations.VisibleForTesting;
40 import com.google.gson.Gson;
41 import fj.data.Either;
42 import org.apache.http.HttpStatus;
43 import org.openecomp.sdc.be.config.BeEcompErrorManager;
44 import org.openecomp.sdc.be.config.ConfigurationManager;
45 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
46 import org.openecomp.sdc.common.log.wrappers.Logger;
47 import org.springframework.stereotype.Component;
49 import java.io.IOException;
50 import java.net.MalformedURLException;
51 import java.security.GeneralSecurityException;
52 import java.util.ArrayList;
53 import java.util.Collection;
54 import java.util.List;
56 import java.util.regex.Matcher;
57 import java.util.regex.Pattern;
59 import static java.util.concurrent.TimeUnit.SECONDS;
61 @Component("cambriaHandler")
62 public class CambriaHandler implements ICambriaHandler{
64 private static final Logger log = Logger.getLogger(CambriaHandler.class.getName());
65 private static final String PARTITION_KEY = "asdc" + "aa";
66 private static final String SEND_NOTIFICATION = "send notification";
67 private static final String CONSUMER_ID = ConfigurationManager.getConfigurationManager()
68 .getDistributionEngineConfiguration()
69 .getDistributionStatusTopic()
71 private static final boolean USE_HTTPS_WITH_DMAAP = ConfigurationManager.getConfigurationManager()
72 .getDistributionEngineConfiguration()
73 .isUseHttpsWithDmaap();
74 private final Gson gson = new Gson();
78 * process the response error from Cambria client
83 private Integer processMessageException(String message) {
85 String[] patterns = {"(HTTP Status )(\\d\\d\\d)", "(HTTP/\\d.\\d )(\\d\\d\\d)"};
87 Integer result = checkPattern(patterns[0], message, 2);
91 result = checkPattern(patterns[1], message, 2);
98 * check whether the message has a match with a given pattern inside it
105 private Integer checkPattern(String patternStr, String message, int groupIndex) {
106 Integer result = null;
108 Pattern pattern = Pattern.compile(patternStr);
109 Matcher matcher = pattern.matcher(message);
110 boolean find = matcher.find();
112 String httpCode = matcher.group(groupIndex);
113 if (httpCode != null) {
115 result = Integer.valueOf(httpCode);
117 catch (NumberFormatException e) {
118 log.debug("Failed to parse http code {}", httpCode);
126 * retrieve all topics from U-EB server
132 public Either<Set<String>, CambriaErrorResponse> getTopics(List<String> hostSet) {
134 CambriaTopicManager createTopicManager = null;
137 createTopicManager = buildCambriaClient(createTopicManagerBuilder(hostSet));
139 Set<String> topics = createTopicManager.getTopics();
141 if (topics == null || topics.isEmpty()) {
142 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.NOT_FOUND, null);
143 return Either.right(cambriaErrorResponse);
146 return Either.left(topics);
149 catch (IOException | GeneralSecurityException e) {
151 CambriaErrorResponse cambriaErrorResponse = processError(e);
153 log.debug("Failed to fetch topics from U-EB server", e);
154 writeErrorToLog(cambriaErrorResponse, "getTopics", "get topics");
156 return Either.right(cambriaErrorResponse);
158 if (createTopicManager != null) {
159 createTopicManager.close();
166 * process the error message from Cambria client.
168 * set Cambria status and http code in case we succeed to fetch it
172 private CambriaErrorResponse processError(Exception e) {
174 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse();
176 Integer httpCode = processMessageException(e.getMessage());
178 if (httpCode != null) {
179 cambriaErrorResponse.setHttpCode(httpCode);
180 switch (httpCode.intValue()) {
183 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.AUTHENTICATION_ERROR);
186 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.TOPIC_ALREADY_EXIST);
189 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.INTERNAL_SERVER_ERROR);
192 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.CONNNECTION_ERROR);
197 boolean found = false;
198 Throwable throwable = e.getCause();
199 if (throwable != null) {
200 String message = throwable.getMessage();
202 Throwable cause = throwable.getCause();
205 Class<?> clazz = cause.getClass();
206 String className = clazz.getName();
207 if (className.endsWith("UnknownHostException")) {
208 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.UNKNOWN_HOST_ERROR);
209 cambriaErrorResponse.addVariable(message);
216 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.CONNNECTION_ERROR);
217 cambriaErrorResponse.setHttpCode(HttpStatus.SC_INTERNAL_SERVER_ERROR);
221 return cambriaErrorResponse;
225 * write the error to the log
226 * @param cambriaErrorResponse
228 * @param operationDesc
230 private void writeErrorToLog(CambriaErrorResponse cambriaErrorResponse, String methodName, String operationDesc) {
232 String httpCode = cambriaErrorResponse.getHttpCode() == null ? "" : String.valueOf(cambriaErrorResponse.getHttpCode());
234 switch (cambriaErrorResponse.getOperationStatus()) {
235 case UNKNOWN_HOST_ERROR:
236 BeEcompErrorManager.getInstance().logBeUebUnkownHostError(methodName, httpCode);
238 case AUTHENTICATION_ERROR:
239 BeEcompErrorManager.getInstance().logBeUebAuthenticationError(methodName, httpCode);
241 case CONNNECTION_ERROR:
242 BeEcompErrorManager.getInstance().logBeUebConnectionError(methodName, httpCode);
244 case INTERNAL_SERVER_ERROR:
245 BeEcompErrorManager.getInstance().logBeUebSystemError(methodName, operationDesc);
254 * create a topic if it does not exists in the topicsList
256 * @param hostSet - list of U-EB servers
259 * @param topicName - topic to create
260 * @param partitionCount
261 * @param replicationCount
265 public CambriaErrorResponse createTopic(Collection<String> hostSet, String apiKey, String secretKey, String topicName, int partitionCount, int replicationCount) {
267 CambriaTopicManager createTopicManager = null;
270 AbstractAuthenticatedManagerBuilder<CambriaTopicManager> clientBuilder = createTopicManagerBuilder(hostSet, apiKey, secretKey);
271 createTopicManager = buildCambriaClient(clientBuilder);
273 createTopicManager.createTopic(topicName, "ASDC distribution notification topic", partitionCount, replicationCount);
276 catch (HttpException | IOException | GeneralSecurityException e) {
278 log.debug("Failed to create topic {}", topicName, e);
280 CambriaErrorResponse cambriaErrorResponse = processError(e);
282 if (cambriaErrorResponse.getOperationStatus() != CambriaOperationStatus.TOPIC_ALREADY_EXIST) {
283 writeErrorToLog(cambriaErrorResponse, "createTopic", "create topic");
286 return cambriaErrorResponse;
289 if (createTopicManager != null) {
290 createTopicManager.close();
293 return new CambriaErrorResponse(CambriaOperationStatus.OK);
297 public CambriaErrorResponse unRegisterFromTopic(Collection<String> hostSet, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum, String topicName) {
298 String methodName = "unRegisterFromTopic";
299 CambriaTopicManager createTopicManager = null;
301 AbstractAuthenticatedManagerBuilder<CambriaTopicManager> clientBuilder = createTopicManagerBuilder(hostSet, managerApiKey, managerSecretKey);
303 createTopicManager = buildCambriaClient(clientBuilder);
305 if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) {
306 createTopicManager.revokeProducer(topicName, subscriberApiKey);
309 createTopicManager.revokeConsumer(topicName, subscriberApiKey);
313 catch (HttpObjectNotFoundException e) {
314 log.debug("Failed to unregister {} from topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString()
316 BeEcompErrorManager.getInstance().logBeUebObjectNotFoundError(methodName, e.getMessage());
318 return new CambriaErrorResponse(CambriaOperationStatus.OBJECT_NOT_FOUND, HttpStatus.SC_NOT_FOUND);
321 catch (HttpException | IOException | GeneralSecurityException e) {
322 log.debug("Failed to unregister {} from topic {} as producer", managerApiKey, topicName, e);
323 CambriaErrorResponse cambriaErrorResponse = processError(e);
325 writeErrorToLog(cambriaErrorResponse, methodName, "unregister from topic as " + subscriberTypeEnum
329 return cambriaErrorResponse;
331 if (createTopicManager != null) {
332 createTopicManager.close();
336 return new CambriaErrorResponse(CambriaOperationStatus.OK, HttpStatus.SC_OK);
339 private AbstractAuthenticatedManagerBuilder<CambriaTopicManager> createTopicManagerBuilder(Collection<String> hostSet, String managerApiKey, String managerSecretKey) {
340 AbstractAuthenticatedManagerBuilder<CambriaTopicManager> clientBuilder = createTopicManagerBuilder(hostSet)
341 .authenticatedBy(managerApiKey, managerSecretKey);
342 if (USE_HTTPS_WITH_DMAAP) {
343 clientBuilder = clientBuilder.usingHttps();
346 return clientBuilder;
349 private AbstractAuthenticatedManagerBuilder<CambriaTopicManager> createTopicManagerBuilder(Collection<String> hostSet) {
350 return new TopicManagerBuilder().usingHosts(hostSet);
354 * register a public key (subscriberId) to a given topic as a CONSUMER or PRODUCER
357 * @param managerApiKey
358 * @param managerSecretKey
359 * @param subscriberApiKey
360 * @param subscriberTypeEnum
365 public CambriaErrorResponse registerToTopic(Collection<String> hostSet, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum, String topicName) {
367 String methodName = "registerToTopic";
368 CambriaTopicManager createTopicManager = null;
370 AbstractAuthenticatedManagerBuilder<CambriaTopicManager> clientBuilder = createTopicManagerBuilder(hostSet, managerApiKey, managerSecretKey);
371 createTopicManager = buildCambriaClient(clientBuilder);
373 if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) {
374 createTopicManager.allowProducer(topicName, subscriberApiKey);
377 createTopicManager.allowConsumer(topicName, subscriberApiKey);
381 catch (HttpObjectNotFoundException e) {
382 log.debug("Failed to register {} to topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString()
385 BeEcompErrorManager.getInstance().logBeUebObjectNotFoundError(methodName, e.getMessage());
387 return new CambriaErrorResponse(CambriaOperationStatus.OBJECT_NOT_FOUND, HttpStatus.SC_NOT_FOUND);
390 catch (HttpException | IOException | GeneralSecurityException e) {
391 log.debug("Failed to register {} to topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString()
393 CambriaErrorResponse cambriaErrorResponse = processError(e);
395 writeErrorToLog(cambriaErrorResponse, methodName, "register to topic as " + subscriberTypeEnum
399 return cambriaErrorResponse;
401 if (createTopicManager != null) {
402 createTopicManager.close();
406 return new CambriaErrorResponse(CambriaOperationStatus.OK, HttpStatus.SC_OK);
410 * create and retrieve a Cambria Consumer for a specific topic
417 * @param consumerGroup
423 public CambriaConsumer createConsumer(Collection<String> hostSet, String topicName, String apiKey, String secretKey, String consumerId, String consumerGroup, int timeoutMS) throws Exception {
425 CambriaConsumer consumer = new ConsumerBuilder().authenticatedBy(apiKey, secretKey)
426 .knownAs(consumerGroup, consumerId)
429 .waitAtServer(timeoutMS)
431 consumer.setApiCredentials(apiKey, secretKey);
435 public void closeConsumer(CambriaConsumer consumer) {
437 if (consumer != null) {
444 * use the topicConsumer to fetch messages from topic. in case no messages were fetched, empty ArrayList will be returned (not error)
446 * @param topicConsumer
450 public Either<Iterable<String>, CambriaErrorResponse> fetchFromTopic(CambriaConsumer topicConsumer) {
452 String methodName = "fetchFromTopic";
454 Iterable<String> messages = topicConsumer.fetch();
455 if (messages == null) {
456 messages = new ArrayList<>();
458 return Either.left(messages);
461 catch (IOException e) {
462 CambriaErrorResponse cambriaErrorResponse = processError(e);
463 log.debug("Failed to fetch from U-EB topic. error={}", e.getMessage());
464 writeErrorToLog(cambriaErrorResponse, methodName, "get messages from topic");
465 return Either.right(cambriaErrorResponse);
468 catch (Exception e) {
469 log.debug("Failed to fetch from U-EB topic", e);
470 BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
471 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, HttpStatus.SC_INTERNAL_SERVER_ERROR);
472 return Either.right(cambriaErrorResponse);
477 * Publish notification message to a given queue
480 * @param uebPublicKey
481 * @param uebSecretKey
487 public CambriaErrorResponse sendNotification(String topicName, String uebPublicKey, String uebSecretKey, List<String> uebServers, INotificationData data) {
489 CambriaBatchingPublisher createSimplePublisher = null;
493 String json = gson.toJson(data);
494 log.trace("Before sending notification data {} to topic {}", json, topicName);
496 createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHosts(uebServers).build();
497 createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey);
499 int result = createSimplePublisher.send(PARTITION_KEY, json);
504 catch (InterruptedException e) {
505 log.debug("Failed during sleep after sending the message.", e);
506 Thread.currentThread().interrupt();
509 log.debug("After sending notification data to topic {}. result is {}", topicName, result);
511 return new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
513 } catch (IOException | GeneralSecurityException e) {
514 log.debug("Failed to send notification {} to topic {} ", data, topicName, e);
516 CambriaErrorResponse cambriaErrorResponse = processError(e);
518 writeErrorToLog(cambriaErrorResponse, "sendNotification", SEND_NOTIFICATION);
520 return cambriaErrorResponse;
523 if (createSimplePublisher != null) {
524 log.debug("Before closing publisher");
525 createSimplePublisher.close();
526 log.debug("After closing publisher");
531 public CambriaErrorResponse sendNotificationAndClose(String topicName, String uebPublicKey, String uebSecretKey, List<String> uebServers, INotificationData data, long waitBeforeCloseTimeout) {
532 String methodName = "sendNotificationAndClose";
533 CambriaBatchingPublisher createSimplePublisher;
534 CambriaErrorResponse response;
537 String json = gson.toJson(data);
538 log.debug("Before sending notification data {} to topic {}", json, topicName);
540 createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHosts(uebServers).build();
541 createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey);
543 int result = createSimplePublisher.send(PARTITION_KEY, json);
548 catch (InterruptedException e) {
549 log.debug("Failed during sleep after sending the message.", e);
550 Thread.currentThread().interrupt();
553 log.debug("After sending notification data to topic {}. result is {}", topicName, result);
556 catch (IOException | GeneralSecurityException e) {
557 log.debug("Failed to send notification {} to topic {} ", data, topicName, e);
560 response = processError(e);
562 writeErrorToLog(response, methodName, SEND_NOTIFICATION);
568 log.debug("Before closing publisher. Maximum timeout is {} seconds", waitBeforeCloseTimeout);
570 List<message> messagesInQ = createSimplePublisher.close(waitBeforeCloseTimeout, SECONDS);
571 if (messagesInQ != null && !messagesInQ.isEmpty()) {
572 log.debug("Cambria client returned {} non sent messages.", messagesInQ.size());
573 response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
574 writeErrorToLog(response, methodName, SEND_NOTIFICATION);
577 log.debug("No message left in the queue after closing cambria publisher");
578 response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
581 catch (InterruptedException e) {
582 log.debug("InterruptedException while closing cambria publisher", e);
583 Thread.currentThread().interrupt();
584 response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
585 writeErrorToLog(response, methodName, SEND_NOTIFICATION);
587 catch (IOException e) {
588 log.debug("Failed to close cambria publisher", e);
589 response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
590 writeErrorToLog(response, methodName, SEND_NOTIFICATION);
592 log.debug("After closing publisher");
598 public CambriaErrorResponse getApiKey(String server, String apiKey) {
600 CambriaErrorResponse response;
601 List<String> hostSet = new ArrayList<>();
604 CambriaIdentityManager createIdentityManager = buildCambriaClient(new IdentityManagerBuilder().usingHosts(hostSet));
605 createIdentityManager.getApiKey(apiKey);
607 response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
610 catch (HttpException | IOException | CambriaApiException | GeneralSecurityException e) {
611 log.debug("Failed to fetch api key {} from server {}", apiKey, server, e);
613 response = processError(e);
620 public Either<ApiCredential, CambriaErrorResponse> createUebKeys(List<String> hostSet) {
621 Either<ApiCredential, CambriaErrorResponse> result;
624 CambriaIdentityManager createIdentityManager = buildCambriaClient(new IdentityManagerBuilder().usingHosts(hostSet));
626 String description = String.format("ASDC Key for %s", CONSUMER_ID);
627 ApiCredential credential = createIdentityManager.createApiKey("", description);
628 createIdentityManager.setApiCredentials(credential.getApiKey(), credential.getApiSecret());
629 result = Either.left(credential);
632 catch (Exception e) {
633 log.debug("Failed to create ueb keys for servers {}", hostSet, e);
635 result = Either.right(processError(e));
643 <T extends CambriaClient> T buildCambriaClient(CambriaClientBuilders.AbstractAuthenticatedManagerBuilder<T> client) throws MalformedURLException, GeneralSecurityException {
644 return client.build();