re base code
[sdc.git] / catalog-be / src / main / java / org / openecomp / sdc / be / components / distribution / engine / CambriaHandler.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.be.components.distribution.engine;
22
23 import com.att.nsa.apiClient.credentials.ApiCredential;
24 import com.att.nsa.apiClient.http.HttpException;
25 import com.att.nsa.apiClient.http.HttpObjectNotFoundException;
26 import com.att.nsa.cambria.client.*;
27 import com.att.nsa.cambria.client.CambriaClient.CambriaApiException;
28 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
29 import com.att.nsa.cambria.client.CambriaClientBuilders.IdentityManagerBuilder;
30 import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
31 import com.att.nsa.cambria.client.CambriaClientBuilders.TopicManagerBuilder;
32 import com.att.nsa.cambria.client.CambriaPublisher.message;
33 import com.google.common.annotations.VisibleForTesting;
34 import com.google.gson.Gson;
35 import fj.data.Either;
36 import org.apache.http.HttpStatus;
37 import org.openecomp.sdc.be.config.BeEcompErrorManager;
38 import org.openecomp.sdc.be.config.ConfigurationManager;
39 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
40 import org.openecomp.sdc.common.log.wrappers.Logger;
41 import org.springframework.stereotype.Component;
42
43 import java.io.IOException;
44 import java.net.MalformedURLException;
45 import java.security.GeneralSecurityException;
46 import java.util.ArrayList;
47 import java.util.Collection;
48 import java.util.List;
49 import java.util.Set;
50 import java.util.regex.Matcher;
51 import java.util.regex.Pattern;
52
53 import static java.util.concurrent.TimeUnit.SECONDS;
54
55 @Component("cambriaHandler")
56 public class CambriaHandler {
57
58     private static final Logger log = Logger.getLogger(CambriaHandler.class.getName());
59     private static final String PARTITION_KEY = "asdc" + "aa";
60     private static final String SEND_NOTIFICATION = "send notification";
61     private static final String CONSUMER_ID = ConfigurationManager.getConfigurationManager()
62                                                                   .getDistributionEngineConfiguration()
63                                                                   .getDistributionStatusTopic()
64                                                                   .getConsumerId();
65     private final Gson gson = new Gson();
66
67
68     /**
69      * process the response error from Cambria client
70      *
71      * @param message
72      * @return
73      */
74     private Integer processMessageException(String message) {
75
76         String[] patterns = {"(HTTP Status )(\\d\\d\\d)", "(HTTP/\\d.\\d )(\\d\\d\\d)"};
77
78         Integer result = checkPattern(patterns[0], message, 2);
79         if (result != null) {
80             return result;
81         }
82         result = checkPattern(patterns[1], message, 2);
83
84         return result;
85
86     }
87
88     /**
89      * check whether the message has a match with a given pattern inside it
90      *
91      * @param patternStr
92      * @param message
93      * @param groupIndex
94      * @return
95      */
96     private Integer checkPattern(String patternStr, String message, int groupIndex) {
97         Integer result = null;
98
99         Pattern pattern = Pattern.compile(patternStr);
100         Matcher matcher = pattern.matcher(message);
101         boolean find = matcher.find();
102         if (find) {
103             String httpCode = matcher.group(groupIndex);
104             if (httpCode != null) {
105                 try {
106                     result = Integer.valueOf(httpCode);
107                 }
108                 catch (NumberFormatException e) {
109                     log.debug("Failed to parse http code {}", httpCode);
110                 }
111             }
112         }
113         return result;
114     }
115
116     /**
117      * retrieve all topics from U-EB server
118      *
119      * @param hostSet
120      * @return
121      */
122     public Either<Set<String>, CambriaErrorResponse> getTopics(List<String> hostSet) {
123
124         CambriaTopicManager createTopicManager = null;
125         try {
126
127             createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet));
128
129             Set<String> topics = createTopicManager.getTopics();
130
131             if (topics == null || topics.isEmpty()) {
132                 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.NOT_FOUND, null);
133                 return Either.right(cambriaErrorResponse);
134             }
135
136             return Either.left(topics);
137
138         }
139         catch (IOException | GeneralSecurityException e) {
140
141             CambriaErrorResponse cambriaErrorResponse = processError(e);
142
143             log.debug("Failed to fetch topics from U-EB server", e);
144             writeErrorToLog(cambriaErrorResponse, "getTopics", "get topics");
145
146             return Either.right(cambriaErrorResponse);
147         } finally {
148             if (createTopicManager != null) {
149                 createTopicManager.close();
150             }
151         }
152
153     }
154
155     /**
156      * process the error message from Cambria client.
157      * <p>
158      * set Cambria status and http code in case we succeed to fetch it
159      *
160      * @return
161      */
162     private CambriaErrorResponse processError(Exception e) {
163
164         CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse();
165
166         Integer httpCode = processMessageException(e.getMessage());
167
168         if (httpCode != null) {
169             cambriaErrorResponse.setHttpCode(httpCode);
170             switch (httpCode.intValue()) {
171
172                 case 401:
173                     cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.AUTHENTICATION_ERROR);
174                     break;
175                 case 409:
176                     cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.TOPIC_ALREADY_EXIST);
177                     break;
178                 case 500:
179                     cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.INTERNAL_SERVER_ERROR);
180                     break;
181                 default:
182                     cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.CONNNECTION_ERROR);
183             }
184         }
185         else {
186
187             boolean found = false;
188             Throwable throwable = e.getCause();
189             if (throwable != null) {
190                 String message = throwable.getMessage();
191
192                 Throwable cause = throwable.getCause();
193
194                 if (cause != null) {
195                     Class<?> clazz = cause.getClass();
196                     String className = clazz.getName();
197                     if (className.endsWith("UnknownHostException")) {
198                         cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.UNKNOWN_HOST_ERROR);
199                         cambriaErrorResponse.addVariable(message);
200                         found = true;
201                     }
202                 }
203             }
204
205             if (!found) {
206                 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.CONNNECTION_ERROR);
207                 cambriaErrorResponse.setHttpCode(HttpStatus.SC_INTERNAL_SERVER_ERROR);
208             }
209         }
210
211         return cambriaErrorResponse;
212     }
213
214     /**
215      * write the error to the log
216      *  @param cambriaErrorResponse
217      * @param methodName
218      * @param operationDesc
219      */
220     private void writeErrorToLog(CambriaErrorResponse cambriaErrorResponse, String methodName, String operationDesc) {
221
222         String httpCode = cambriaErrorResponse.getHttpCode() == null ? "" : String.valueOf(cambriaErrorResponse.getHttpCode());
223
224         switch (cambriaErrorResponse.getOperationStatus()) {
225             case UNKNOWN_HOST_ERROR:
226                 BeEcompErrorManager.getInstance().logBeUebUnkownHostError(methodName, httpCode);
227                 break;
228             case AUTHENTICATION_ERROR:
229                 BeEcompErrorManager.getInstance().logBeUebAuthenticationError(methodName, httpCode);
230                 break;
231             case CONNNECTION_ERROR:
232                 BeEcompErrorManager.getInstance().logBeUebConnectionError(methodName, httpCode);
233                 break;
234             case INTERNAL_SERVER_ERROR:
235                 BeEcompErrorManager.getInstance().logBeUebSystemError(methodName, operationDesc);
236                 break;
237             default:
238                 break;
239         }
240
241     }
242
243     /**
244      * create a topic if it does not exists in the topicsList
245      *
246      * @param hostSet          - list of U-EB servers
247      * @param apiKey
248      * @param secretKey
249      * @param topicName        - topic to create
250      * @param partitionCount
251      * @param replicationCount
252      * @return
253      */
254     public CambriaErrorResponse createTopic(Collection<String> hostSet, String apiKey, String secretKey, String topicName, int partitionCount, int replicationCount) {
255
256         CambriaTopicManager createTopicManager = null;
257         try {
258
259             createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet)
260                                                                              .authenticatedBy(apiKey, secretKey));
261
262             createTopicManager.createTopic(topicName, "ASDC distribution notification topic", partitionCount, replicationCount);
263
264         }
265         catch (HttpException | IOException | GeneralSecurityException e) {
266
267             log.debug("Failed to create topic {}", topicName, e);
268
269             CambriaErrorResponse cambriaErrorResponse = processError(e);
270
271             if (cambriaErrorResponse.getOperationStatus() != CambriaOperationStatus.TOPIC_ALREADY_EXIST) {
272                 writeErrorToLog(cambriaErrorResponse, "createTopic", "create topic");
273             }
274
275             return cambriaErrorResponse;
276
277         } finally {
278             if (createTopicManager != null) {
279                 createTopicManager.close();
280             }
281         }
282         return new CambriaErrorResponse(CambriaOperationStatus.OK);
283
284     }
285
286     public CambriaErrorResponse unRegisterFromTopic(Collection<String> hostSet, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum, String topicName) {
287         String methodName = "unRegisterFromTopic";
288         CambriaTopicManager createTopicManager = null;
289         try {
290             createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet)
291                                                                              .authenticatedBy(managerApiKey, managerSecretKey));
292
293             if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) {
294                 createTopicManager.revokeProducer(topicName, subscriberApiKey);
295             }
296             else {
297                 createTopicManager.revokeConsumer(topicName, subscriberApiKey);
298             }
299
300         }
301         catch (HttpObjectNotFoundException e) {
302             log.debug("Failed to unregister {} from topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString()
303                                                                                                                  .toLowerCase(), e);
304             BeEcompErrorManager.getInstance().logBeUebObjectNotFoundError(methodName, e.getMessage());
305
306             return new CambriaErrorResponse(CambriaOperationStatus.OBJECT_NOT_FOUND, HttpStatus.SC_NOT_FOUND);
307
308         }
309         catch (HttpException | IOException | GeneralSecurityException e) {
310             log.debug("Failed to unregister {} from topic {} as producer", managerApiKey, topicName, e);
311             CambriaErrorResponse cambriaErrorResponse = processError(e);
312
313             writeErrorToLog(cambriaErrorResponse, methodName, "unregister from topic as " + subscriberTypeEnum
314                     .toString()
315                     .toLowerCase());
316
317             return cambriaErrorResponse;
318         } finally {
319             if (createTopicManager != null) {
320                 createTopicManager.close();
321             }
322         }
323
324         return new CambriaErrorResponse(CambriaOperationStatus.OK, HttpStatus.SC_OK);
325     }
326
327     /**
328      * register a public key (subscriberId) to a given topic as a CONSUMER or PRODUCER
329      *
330      * @param hostSet
331      * @param managerApiKey
332      * @param managerSecretKey
333      * @param subscriberApiKey
334      * @param subscriberTypeEnum
335      * @param topicName
336      * @return
337      */
338     public CambriaErrorResponse registerToTopic(Collection<String> hostSet, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum, String topicName) {
339
340         String methodName = "registerToTopic";
341         CambriaTopicManager createTopicManager = null;
342         try {
343             createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet)
344                                                                              .authenticatedBy(managerApiKey, managerSecretKey));
345
346             if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) {
347                 createTopicManager.allowProducer(topicName, subscriberApiKey);
348             }
349             else {
350                 createTopicManager.allowConsumer(topicName, subscriberApiKey);
351             }
352
353         }
354         catch (HttpObjectNotFoundException e) {
355             log.debug("Failed to register {} to topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString()
356                                                                                                              .toLowerCase(), e);
357
358             BeEcompErrorManager.getInstance().logBeUebObjectNotFoundError(methodName, e.getMessage());
359
360             return new CambriaErrorResponse(CambriaOperationStatus.OBJECT_NOT_FOUND, HttpStatus.SC_NOT_FOUND);
361
362         }
363         catch (HttpException | IOException | GeneralSecurityException e) {
364             log.debug("Failed to register {} to topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString()
365                                                                                                              .toLowerCase(), e);
366             CambriaErrorResponse cambriaErrorResponse = processError(e);
367
368             writeErrorToLog(cambriaErrorResponse, methodName, "register to topic as " + subscriberTypeEnum
369                     .toString()
370                     .toLowerCase());
371
372             return cambriaErrorResponse;
373         } finally {
374             if (createTopicManager != null) {
375                 createTopicManager.close();
376             }
377         }
378
379         return new CambriaErrorResponse(CambriaOperationStatus.OK, HttpStatus.SC_OK);
380     }
381
382     /**
383      * create and retrieve a Cambria Consumer for a specific topic
384      *
385      * @param hostSet
386      * @param topicName
387      * @param apiKey
388      * @param secretKey
389      * @param consumerId
390      * @param consumerGroup
391      * @param timeoutMS
392      * @return
393      * @throws Exception
394      */
395     public CambriaConsumer createConsumer(Collection<String> hostSet, String topicName, String apiKey, String secretKey, String consumerId, String consumerGroup, int timeoutMS) throws Exception {
396
397         CambriaConsumer consumer = new ConsumerBuilder().authenticatedBy(apiKey, secretKey)
398                                                         .knownAs(consumerGroup, consumerId)
399                                                         .onTopic(topicName)
400                                                         .usingHosts(hostSet)
401                                                         .waitAtServer(timeoutMS)
402                                                         .build();
403         consumer.setApiCredentials(apiKey, secretKey);
404         return consumer;
405     }
406
407     public void closeConsumer(CambriaConsumer consumer) {
408
409         if (consumer != null) {
410             consumer.close();
411         }
412
413     }
414
415     /**
416      * use the topicConsumer to fetch messages from topic. in case no messages were fetched, empty ArrayList will be returned (not error)
417      *
418      * @param topicConsumer
419      * @return
420      */
421     public Either<Iterable<String>, CambriaErrorResponse> fetchFromTopic(CambriaConsumer topicConsumer) {
422
423         String methodName = "fetchFromTopic";
424         try {
425             Iterable<String> messages = topicConsumer.fetch();
426             if (messages == null) {
427                 messages = new ArrayList<>();
428             }
429             return Either.left(messages);
430
431         }
432         catch (IOException e) {
433             CambriaErrorResponse cambriaErrorResponse = processError(e);
434             log.debug("Failed to fetch from U-EB topic. error={}", e.getMessage());
435             writeErrorToLog(cambriaErrorResponse, methodName, "get messages from topic");
436             return Either.right(cambriaErrorResponse);
437
438         }
439         catch (Exception e) {
440             log.debug("Failed to fetch from U-EB topic", e);
441             BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
442             CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, HttpStatus.SC_INTERNAL_SERVER_ERROR);
443             return Either.right(cambriaErrorResponse);
444         }
445     }
446
447     /**
448      * Publish notification message to a given queue
449      *
450      * @param topicName
451      * @param uebPublicKey
452      * @param uebSecretKey
453      * @param uebServers
454      * @param data
455      * @return
456      */
457     public CambriaErrorResponse sendNotification(String topicName, String uebPublicKey, String uebSecretKey, List<String> uebServers, INotificationData data) {
458
459         CambriaBatchingPublisher createSimplePublisher = null;
460
461         try {
462
463             String json = gson.toJson(data);
464             log.trace("Before sending notification data {} to topic {}", json, topicName);
465
466             createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHosts(uebServers).build();
467             createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey);
468
469             int result = createSimplePublisher.send(PARTITION_KEY, json);
470
471             try {
472                 SECONDS.sleep(1L);
473             }
474             catch (InterruptedException e) {
475                 log.debug("Failed during sleep after sending the message.", e);
476             }
477
478             log.debug("After sending notification data to topic {}. result is {}", topicName, result);
479
480             return new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
481
482         } catch (IOException | GeneralSecurityException e) {
483             log.debug("Failed to send notification {} to topic {} ", data, topicName, e);
484
485             CambriaErrorResponse cambriaErrorResponse = processError(e);
486
487             writeErrorToLog(cambriaErrorResponse, "sendNotification", SEND_NOTIFICATION);
488
489             return cambriaErrorResponse;
490         }
491         finally {
492             if (createSimplePublisher != null) {
493                 log.debug("Before closing publisher");
494                 createSimplePublisher.close();
495                 log.debug("After closing publisher");
496             }
497         }
498     }
499
500     public CambriaErrorResponse sendNotificationAndClose(String topicName, String uebPublicKey, String uebSecretKey, List<String> uebServers, INotificationData data, long waitBeforeCloseTimeout) {
501         String methodName = "sendNotificationAndClose";
502         CambriaBatchingPublisher createSimplePublisher;
503         CambriaErrorResponse response;
504         try {
505
506             String json = gson.toJson(data);
507             log.debug("Before sending notification data {} to topic {}", json, topicName);
508
509             createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHosts(uebServers).build();
510             createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey);
511
512             int result = createSimplePublisher.send(PARTITION_KEY, json);
513
514             try {
515                 Thread.sleep(1000);
516             }
517             catch (InterruptedException e) {
518                 log.debug("Failed during sleep after sending the message.", e);
519             }
520
521             log.debug("After sending notification data to topic {}. result is {}", topicName, result);
522
523         }
524         catch (IOException | GeneralSecurityException  e) {
525             log.debug("Failed to send notification {} to topic {} ", data, topicName, e);
526
527
528             response = processError(e);
529
530             writeErrorToLog(response, methodName, SEND_NOTIFICATION);
531
532             return response;
533
534         }
535
536         log.debug("Before closing publisher. Maximum timeout is {} seconds", waitBeforeCloseTimeout);
537         try {
538             List<message> messagesInQ = createSimplePublisher.close(waitBeforeCloseTimeout, SECONDS);
539             if (messagesInQ != null && !messagesInQ.isEmpty()) {
540                 log.debug("Cambria client returned {} non sent messages.", messagesInQ.size());
541                 response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
542                 writeErrorToLog(response, methodName, SEND_NOTIFICATION);
543             }
544             else {
545                 log.debug("No message left in the queue after closing cambria publisher");
546                 response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
547             }
548         }
549         catch (IOException | InterruptedException e) {
550             log.debug("Failed to close cambria publisher", e);
551             response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
552             writeErrorToLog(response, methodName, SEND_NOTIFICATION);
553         }
554         log.debug("After closing publisher");
555
556         return response;
557
558     }
559
560     public CambriaErrorResponse getApiKey(String server, String apiKey) {
561
562         CambriaErrorResponse response;
563         List<String> hostSet = new ArrayList<>();
564         hostSet.add(server);
565         try {
566             CambriaIdentityManager createIdentityManager = buildCambriaClient(new IdentityManagerBuilder().usingHosts(hostSet));
567             createIdentityManager.getApiKey(apiKey);
568
569             response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
570
571         }
572         catch (HttpException | IOException | CambriaApiException | GeneralSecurityException e) {
573             log.debug("Failed to fetch api key {} from server {}", apiKey, server, e);
574
575             response = processError(e);
576
577         }
578
579         return response;
580     }
581
582     public Either<ApiCredential, CambriaErrorResponse> createUebKeys(List<String> hostSet) {
583         Either<ApiCredential, CambriaErrorResponse> result;
584
585         try {
586             CambriaIdentityManager createIdentityManager = buildCambriaClient(new IdentityManagerBuilder().usingHosts(hostSet));
587
588             String description = String.format("ASDC Key for %s", CONSUMER_ID);
589             ApiCredential credential = createIdentityManager.createApiKey("", description);
590             createIdentityManager.setApiCredentials(credential.getApiKey(), credential.getApiSecret());
591             result = Either.left(credential);
592
593         }
594         catch (Exception e) {
595             log.debug("Failed to create ueb keys for servers {}", hostSet, e);
596
597             result = Either.right(processError(e));
598
599         }
600
601         return result;
602     }
603
604     @VisibleForTesting
605     <T extends CambriaClient> T buildCambriaClient(CambriaClientBuilders.AbstractAuthenticatedManagerBuilder<? extends CambriaClient> client) throws MalformedURLException, GeneralSecurityException {
606         return (T) client.build();
607     }
608 }