2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdc.be.components.distribution.engine;
23 import java.io.IOException;
24 import java.net.MalformedURLException;
25 import java.security.GeneralSecurityException;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.Iterator;
29 import java.util.List;
31 import java.util.concurrent.TimeUnit;
32 import java.util.regex.Matcher;
33 import java.util.regex.Pattern;
35 import org.apache.http.HttpStatus;
36 import org.openecomp.sdc.be.config.BeEcompErrorManager;
37 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
38 import org.openecomp.sdc.common.config.EcompErrorName;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
42 import com.att.nsa.apiClient.http.HttpException;
43 import com.att.nsa.apiClient.http.HttpObjectNotFoundException;
44 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
45 import com.att.nsa.cambria.client.CambriaClient.CambriaApiException;
46 import com.att.nsa.cambria.client.CambriaClientBuilders.TopicManagerBuilder;
47 import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
48 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
49 import com.att.nsa.cambria.client.CambriaClientBuilders.IdentityManagerBuilder;
50 import com.att.nsa.cambria.client.CambriaConsumer;
51 import com.att.nsa.cambria.client.CambriaIdentityManager;
52 import com.att.nsa.cambria.client.CambriaPublisher.message;
53 import com.att.nsa.cambria.client.CambriaTopicManager;
54 import com.google.gson.Gson;
56 import fj.data.Either;
58 public class CambriaHandler {
60 private static Logger logger = LoggerFactory.getLogger(CambriaHandler.class.getName());
62 public static String PARTITION_KEY = "asdc" + "aa";
64 private Gson gson = new Gson();
67 * process the response error from Cambria client
72 private Integer processMessageException(String message) {
74 String[] patterns = { "(HTTP Status )(\\d\\d\\d)", "(HTTP/\\d.\\d )(\\d\\d\\d)" };
76 Integer result = checkPattern(patterns[0], message, 2);
80 result = checkPattern(patterns[1], message, 2);
87 * check whether the message has a match with a given pattern inside it
94 private Integer checkPattern(String patternStr, String message, int groupIndex) {
95 Integer result = null;
97 Pattern pattern = Pattern.compile(patternStr);
98 Matcher matcher = pattern.matcher(message);
99 boolean find = matcher.find();
101 String httpCode = matcher.group(groupIndex);
102 if (httpCode != null) {
104 result = Integer.valueOf(httpCode);
105 } catch (NumberFormatException e) {
106 logger.debug("Failed to parse http code {}", httpCode);
114 * retrieve all topics from U-EB server
119 public Either<Set<String>, CambriaErrorResponse> getTopics(List<String> hostSet) {
121 CambriaTopicManager createTopicManager = null;
124 createTopicManager = new TopicManagerBuilder().usingHosts(hostSet).build();
125 Set<String> topics = createTopicManager.getTopics();
127 if (topics == null || true == topics.isEmpty()) {
128 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.NOT_FOUND, null);
129 return Either.right(cambriaErrorResponse);
132 return Either.left(topics);
134 } catch (IOException | GeneralSecurityException e) {
135 String methodName = new Object() {
136 }.getClass().getEnclosingMethod().getName();
138 CambriaErrorResponse cambriaErrorResponse = processError(e);
140 logger.debug("Failed to fetch topics from U-EB server", e);
141 writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "get topics");
143 return Either.right(cambriaErrorResponse);
145 if (createTopicManager != null) {
146 createTopicManager.close();
153 * process the error message from Cambria client.
155 * set Cambria status and http code in case we succeed to fetch it
157 * @param errorMessage
160 private CambriaErrorResponse processError(Exception e) {
162 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse();
164 Integer httpCode = processMessageException(e.getMessage());
166 if (httpCode != null) {
167 cambriaErrorResponse.setHttpCode(httpCode);
168 switch (httpCode.intValue()) {
171 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.AUTHENTICATION_ERROR);
174 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.TOPIC_ALREADY_EXIST);
177 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.INTERNAL_SERVER_ERROR);
180 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.CONNNECTION_ERROR);
184 boolean found = false;
185 Throwable throwable = e.getCause();
186 if (throwable != null) {
187 String message = throwable.getMessage();
189 Throwable cause = throwable.getCause();
192 Class<?> clazz = cause.getClass();
193 String className = clazz.getName();
194 if (className.endsWith("UnknownHostException")) {
195 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.UNKNOWN_HOST_ERROR);
196 cambriaErrorResponse.addVariable(message);
202 if (false == found) {
203 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.CONNNECTION_ERROR);
204 cambriaErrorResponse.setHttpCode(HttpStatus.SC_INTERNAL_SERVER_ERROR);
208 return cambriaErrorResponse;
212 * write the error to the log
214 * @param cambriaErrorResponse
215 * @param errorMessage
217 * @param operationDesc
219 private void writeErrorToLog(CambriaErrorResponse cambriaErrorResponse, String errorMessage, String methodName, String operationDesc) {
221 String httpCode = (cambriaErrorResponse.getHttpCode() == null ? "" : String.valueOf(cambriaErrorResponse.getHttpCode()));
223 switch (cambriaErrorResponse.getOperationStatus()) {
224 case UNKNOWN_HOST_ERROR:
225 String hostname = cambriaErrorResponse.getVariables().get(0);
226 BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebUnkownHostError, methodName, hostname);
227 BeEcompErrorManager.getInstance().logBeUebUnkownHostError(methodName, httpCode);
229 case AUTHENTICATION_ERROR:
230 BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebAuthenticationError, methodName, httpCode);
231 BeEcompErrorManager.getInstance().logBeUebAuthenticationError(methodName, httpCode);
233 case CONNNECTION_ERROR:
234 BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebConnectionError, methodName, httpCode);
235 BeEcompErrorManager.getInstance().logBeUebConnectionError(methodName, httpCode);
238 case INTERNAL_SERVER_ERROR:
239 BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, methodName, operationDesc);
240 BeEcompErrorManager.getInstance().logBeUebSystemError(methodName, operationDesc);
248 * create a topic if it does not exists in the topicsList
251 * - list of U-EB servers
255 * - list of exists topics
258 * @param partitionCount
259 * @param replicationCount
262 public CambriaErrorResponse createTopic(Collection<String> hostSet, String apiKey, String secretKey, String topicName, int partitionCount, int replicationCount) {
264 CambriaTopicManager createTopicManager = null;
266 createTopicManager = new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(apiKey, secretKey).build();
267 createTopicManager.createTopic(topicName, "ASDC distribution notification topic", partitionCount, replicationCount);
269 } catch (GeneralSecurityException | HttpException | IOException e) {
270 logger.debug("Failed to create topic {}", topicName, e);
271 String methodName = new Object() {
272 }.getClass().getEnclosingMethod().getName();
274 CambriaErrorResponse cambriaErrorResponse = processError(e);
276 if (cambriaErrorResponse.getOperationStatus() != CambriaOperationStatus.TOPIC_ALREADY_EXIST) {
277 writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "create topic");
280 return cambriaErrorResponse;
283 if (createTopicManager != null) {
284 createTopicManager.close();
287 return new CambriaErrorResponse(CambriaOperationStatus.OK);
290 public CambriaErrorResponse unRegisterFromTopic(Collection<String> hostSet, String topicName, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum) {
291 CambriaTopicManager createTopicManager = null;
293 createTopicManager = new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey).build();
295 if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) {
296 createTopicManager.revokeProducer(topicName, subscriberApiKey);
298 createTopicManager.revokeConsumer(topicName, subscriberApiKey);
301 } catch (HttpObjectNotFoundException | GeneralSecurityException e) {
302 logger.debug("Failed to unregister {} from topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString().toLowerCase(), e);
303 String methodName = new Object() {
304 }.getClass().getEnclosingMethod().getName();
306 BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebObjectNotFoundError, methodName, e.getMessage());
307 BeEcompErrorManager.getInstance().logBeUebObjectNotFoundError(methodName, e.getMessage());
309 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OBJECT_NOT_FOUND, HttpStatus.SC_NOT_FOUND);
310 return cambriaErrorResponse;
312 } catch (HttpException | IOException e) {
313 logger.debug("Failed to unregister {} from topic {} as producer", managerApiKey, topicName, e);
314 String methodName = new Object() {
315 }.getClass().getEnclosingMethod().getName();
317 CambriaErrorResponse cambriaErrorResponse = processError(e);
319 writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "unregister from topic as " + subscriberTypeEnum.toString().toLowerCase());
321 return cambriaErrorResponse;
323 if (createTopicManager != null) {
324 createTopicManager.close();
328 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK, HttpStatus.SC_OK);
329 return cambriaErrorResponse;
334 * register a public key (subscriberId) to a given topic as a CONSUMER or PRODUCER
338 * @param managerApiKey
339 * @param managerSecretKey
340 * @param subscriberApiKey
341 * @param subscriberTypeEnum
344 public CambriaErrorResponse registerToTopic(Collection<String> hostSet, String topicName, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum) {
346 CambriaTopicManager createTopicManager = null;
348 createTopicManager = new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey).build();
350 if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) {
351 createTopicManager.allowProducer(topicName, subscriberApiKey);
353 createTopicManager.allowConsumer(topicName, subscriberApiKey);
356 } catch (HttpObjectNotFoundException | GeneralSecurityException e) {
357 logger.debug("Failed to register {} to topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString().toLowerCase(), e);
358 String methodName = new Object() {
359 }.getClass().getEnclosingMethod().getName();
361 BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebObjectNotFoundError, methodName, e.getMessage());
362 BeEcompErrorManager.getInstance().logBeUebObjectNotFoundError(methodName, e.getMessage());
364 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OBJECT_NOT_FOUND, HttpStatus.SC_NOT_FOUND);
365 return cambriaErrorResponse;
367 } catch (HttpException | IOException e) {
368 logger.debug("Failed to register {} to topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString().toLowerCase(), e);
369 String methodName = new Object() {
370 }.getClass().getEnclosingMethod().getName();
372 CambriaErrorResponse cambriaErrorResponse = processError(e);
374 writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "register to topic as " + subscriberTypeEnum.toString().toLowerCase());
376 return cambriaErrorResponse;
378 if (createTopicManager != null) {
379 createTopicManager.close();
383 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK, HttpStatus.SC_OK);
384 return cambriaErrorResponse;
388 * create and retrieve a Cambria Consumer for a specific topic
395 * @param consumerGroup
400 public CambriaConsumer createConsumer(Collection<String> hostSet, String topicName, String apiKey, String secretKey, String consumerId, String consumerGroup, int timeoutMS) throws Exception {
402 CambriaConsumer consumer = new ConsumerBuilder().authenticatedBy(apiKey, secretKey).knownAs(consumerGroup, consumerId).onTopic(topicName).usingHosts(hostSet).withSocketTimeout(timeoutMS).build();
403 consumer.setApiCredentials(apiKey, secretKey);
407 public void closeConsumer(CambriaConsumer consumer) {
409 if (consumer != null) {
416 * use the topicConsumer to fetch messages from topic. in case no messages were fetched, empty ArrayList will be returned (not error)
418 * @param topicConsumer
421 public Either<Iterable<String>, CambriaErrorResponse> fetchFromTopic(CambriaConsumer topicConsumer) {
424 Iterable<String> messages = topicConsumer.fetch();
425 if (messages == null) {
426 messages = new ArrayList<String>();
428 return Either.left(messages);
430 } catch (IOException e) {
431 String methodName = new Object() {
432 }.getClass().getEnclosingMethod().getName();
434 CambriaErrorResponse cambriaErrorResponse = processError(e);
436 logger.debug("Failed to fetch from U-EB topic. error={}", e.getMessage());
437 writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "get messages from topic");
439 return Either.right(cambriaErrorResponse);
441 } catch (Exception e) {
442 logger.debug("Failed to fetch from U-EB topic", e);
443 String methodName = new Object() {
444 }.getClass().getEnclosingMethod().getName();
446 BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeDistributionEngineSystemError, methodName, e.getMessage());
447 BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
449 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, HttpStatus.SC_INTERNAL_SERVER_ERROR);
450 return Either.right(cambriaErrorResponse);
455 * Publish notification message to a given queue
458 * @param uebPublicKey
459 * @param uebSecretKey
464 public CambriaErrorResponse sendNotification(String topicName, String uebPublicKey, String uebSecretKey, List<String> uebServers, INotificationData data) {
466 CambriaBatchingPublisher createSimplePublisher = null;
470 String json = gson.toJson(data);
471 logger.trace("Before sending notification data {} to topic {}", json, topicName);
473 createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHosts(uebServers).build();
474 createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey);
476 int result = createSimplePublisher.send(PARTITION_KEY, json);
479 Thread.sleep(1 * 1000);
480 } catch (InterruptedException e) {
481 logger.debug("Failed during sleep after sending the message.", e);
484 logger.debug("After sending notification data to topic {}. result is {}", topicName, result);
486 CambriaErrorResponse response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
490 } catch (IOException | GeneralSecurityException e) {
491 logger.debug("Failed to send notification {} to topic {} ", data, topicName, e);
493 String methodName = new Object() {
494 }.getClass().getEnclosingMethod().getName();
496 CambriaErrorResponse cambriaErrorResponse = processError(e);
498 writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "send notification");
500 return cambriaErrorResponse;
502 if (createSimplePublisher != null) {
503 logger.debug("Before closing publisher");
504 createSimplePublisher.close();
505 logger.debug("After closing publisher");
510 private String convertListToString(List<String> list) {
511 StringBuilder builder = new StringBuilder();
514 for (int i = 0; i < list.size(); i++) {
515 builder.append(list.get(i));
516 if (i < list.size() - 1) {
522 return builder.toString();
525 public CambriaErrorResponse sendNotificationAndClose(String topicName, String uebPublicKey, String uebSecretKey, List<String> uebServers, INotificationData data, long waitBeforeCloseTimeout) {
527 CambriaBatchingPublisher createSimplePublisher = null;
529 CambriaErrorResponse response = null;
532 String json = gson.toJson(data);
533 logger.debug("Before sending notification data {} to topic {}", json, topicName);
535 createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHosts(uebServers).build();
536 createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey);
538 int result = createSimplePublisher.send(PARTITION_KEY, json);
542 } catch (InterruptedException e) {
543 logger.debug("Failed during sleep after sending the message.", e);
546 logger.debug("After sending notification data to topic {}. result is {}", topicName, result);
548 } catch (IOException | GeneralSecurityException e) {
549 logger.debug("Failed to send notification {} to topic {} ", data, topicName, e);
551 String methodName = new Object() {
552 }.getClass().getEnclosingMethod().getName();
554 response = processError(e);
556 writeErrorToLog(response, e.getMessage(), methodName, "send notification");
562 logger.debug("Before closing publisher. Maximum timeout is {} seconds.", waitBeforeCloseTimeout);
564 List<message> messagesInQ = createSimplePublisher.close(waitBeforeCloseTimeout, TimeUnit.SECONDS);
565 if (messagesInQ != null && false == messagesInQ.isEmpty()) {
566 logger.debug("Cambria client returned {} non sent messages.", messagesInQ.size());
567 response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
568 String methodName = new Object() {
569 }.getClass().getEnclosingMethod().getName();
570 writeErrorToLog(response, "closing publisher returned non sent messages", methodName, "send notification");
572 logger.debug("No message left in the queue after closing cambria publisher");
573 response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
575 } catch (IOException | InterruptedException e) {
576 logger.debug("Failed to close cambria publisher", e);
577 response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
578 String methodName = new Object() {
579 }.getClass().getEnclosingMethod().getName();
580 writeErrorToLog(response, "closing publisher returned non sent messages", methodName, "send notification");
582 logger.debug("After closing publisher");
588 public CambriaErrorResponse getApiKey(String server, String apiKey) {
590 CambriaErrorResponse response = null;
592 List<String> hostSet = new ArrayList<>();
594 CambriaIdentityManager createIdentityManager = null;
596 createIdentityManager = new IdentityManagerBuilder().usingHosts(hostSet).build();
597 createIdentityManager.getApiKey(apiKey);
598 response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
600 } catch (HttpException | IOException | CambriaApiException | GeneralSecurityException e) {
601 logger.debug("Failed to fetch api key {} from server ", apiKey, server, e);
603 response = processError(e);