Sync Integ to Master
[sdc.git] / catalog-be / src / main / java / org / openecomp / sdc / be / components / distribution / engine / CambriaHandler.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.be.components.distribution.engine;
22
23 import com.att.nsa.apiClient.credentials.ApiCredential;
24 import com.att.nsa.apiClient.http.HttpException;
25 import com.att.nsa.apiClient.http.HttpObjectNotFoundException;
26 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
27 import com.att.nsa.cambria.client.CambriaClient;
28 import com.att.nsa.cambria.client.CambriaClient.CambriaApiException;
29 import com.att.nsa.cambria.client.CambriaClientBuilders;
30 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
31 import com.att.nsa.cambria.client.CambriaClientBuilders.IdentityManagerBuilder;
32 import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
33 import com.att.nsa.cambria.client.CambriaClientBuilders.TopicManagerBuilder;
34 import com.att.nsa.cambria.client.CambriaConsumer;
35 import com.att.nsa.cambria.client.CambriaIdentityManager;
36 import com.att.nsa.cambria.client.CambriaPublisher.message;
37 import com.att.nsa.cambria.client.CambriaTopicManager;
38 import com.google.common.annotations.VisibleForTesting;
39 import com.google.gson.Gson;
40 import fj.data.Either;
41 import org.apache.http.HttpStatus;
42 import org.openecomp.sdc.be.config.BeEcompErrorManager;
43 import org.openecomp.sdc.be.config.ConfigurationManager;
44 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47 import org.springframework.stereotype.Component;
48
49 import java.io.IOException;
50 import java.net.MalformedURLException;
51 import java.security.GeneralSecurityException;
52 import java.util.ArrayList;
53 import java.util.Collection;
54 import java.util.List;
55 import java.util.Set;
56 import java.util.concurrent.TimeUnit;
57 import java.util.regex.Matcher;
58 import java.util.regex.Pattern;
59
60 @Component("cambriaHandler")
61 public class CambriaHandler {
62
63     private static final Logger logger = LoggerFactory.getLogger(CambriaHandler.class);
64
65     private static final String PARTITION_KEY = "asdc" + "aa";
66
67     private final String SEND_NOTIFICATION = "send notification";
68
69     private Gson gson = new Gson();
70
71     private static final String CONSUMER_ID = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration().getDistributionStatusTopic().getConsumerId();
72
73
74
75     /**
76      * process the response error from Cambria client
77      *
78      * @param message
79      * @return
80      */
81     private Integer processMessageException(String message) {
82
83         String[] patterns = { "(HTTP Status )(\\d\\d\\d)", "(HTTP/\\d.\\d )(\\d\\d\\d)" };
84
85         Integer result = checkPattern(patterns[0], message, 2);
86         if (result != null) {
87             return result;
88         }
89         result = checkPattern(patterns[1], message, 2);
90
91         return result;
92
93     }
94
95     /**
96      * check whether the message has a match with a given pattern inside it
97      *
98      * @param patternStr
99      * @param message
100      * @param groupIndex
101      * @return
102      */
103     private Integer checkPattern(String patternStr, String message, int groupIndex) {
104         Integer result = null;
105
106         Pattern pattern = Pattern.compile(patternStr);
107         Matcher matcher = pattern.matcher(message);
108         boolean find = matcher.find();
109         if (find) {
110             String httpCode = matcher.group(groupIndex);
111             if (httpCode != null) {
112                 try {
113                     result = Integer.valueOf(httpCode);
114                 } catch (NumberFormatException e) {
115                     logger.debug("Failed to parse http code {}", httpCode);
116                 }
117             }
118         }
119         return result;
120     }
121
122     /**
123      * retrieve all topics from U-EB server
124      *
125      * @param hostSet
126      * @return
127      */
128     public Either<Set<String>, CambriaErrorResponse> getTopics(List<String> hostSet) {
129
130         CambriaTopicManager createTopicManager = null;
131         try {
132
133             createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet));
134
135             Set<String> topics = createTopicManager.getTopics();
136
137             if (topics == null || true == topics.isEmpty()) {
138                 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.NOT_FOUND, null);
139                 return Either.right(cambriaErrorResponse);
140             }
141
142             return Either.left(topics);
143
144         } catch (IOException | GeneralSecurityException e) {
145             String methodName = new Object() {
146             }.getClass().getEnclosingMethod().getName();
147
148             CambriaErrorResponse cambriaErrorResponse = processError(e);
149
150             logger.debug("Failed to fetch topics from U-EB server", e);
151             writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "get topics");
152
153             return Either.right(cambriaErrorResponse);
154         } finally {
155             if (createTopicManager != null) {
156                 createTopicManager.close();
157             }
158         }
159
160     }
161
162     /**
163      * process the error message from Cambria client.
164      *
165      * set Cambria status and http code in case we succeed to fetch it
166      *
167      * @return
168      */
169     private CambriaErrorResponse processError(Exception e) {
170
171         CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse();
172
173         Integer httpCode = processMessageException(e.getMessage());
174
175         if (httpCode != null) {
176             cambriaErrorResponse.setHttpCode(httpCode);
177             switch (httpCode.intValue()) {
178
179             case 401:
180                 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.AUTHENTICATION_ERROR);
181                 break;
182             case 409:
183                 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.TOPIC_ALREADY_EXIST);
184                 break;
185             case 500:
186                 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.INTERNAL_SERVER_ERROR);
187                 break;
188             default:
189                 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.CONNNECTION_ERROR);
190             }
191         } else {
192
193             boolean found = false;
194             Throwable throwable = e.getCause();
195             if (throwable != null) {
196                 String message = throwable.getMessage();
197
198                 Throwable cause = throwable.getCause();
199
200                 if (cause != null) {
201                     Class<?> clazz = cause.getClass();
202                     String className = clazz.getName();
203                     if (className.endsWith("UnknownHostException")) {
204                         cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.UNKNOWN_HOST_ERROR);
205                         cambriaErrorResponse.addVariable(message);
206                         found = true;
207                     }
208                 }
209             }
210
211             if (false == found) {
212                 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.CONNNECTION_ERROR);
213                 cambriaErrorResponse.setHttpCode(HttpStatus.SC_INTERNAL_SERVER_ERROR);
214             }
215         }
216
217         return cambriaErrorResponse;
218     }
219
220     /**
221      * write the error to the log
222      *
223      * @param cambriaErrorResponse
224      * @param errorMessage
225      * @param methodName
226      * @param operationDesc
227      */
228     private void writeErrorToLog(CambriaErrorResponse cambriaErrorResponse, String errorMessage, String methodName, String operationDesc) {
229
230         String httpCode = cambriaErrorResponse.getHttpCode() == null ? "" : String.valueOf(cambriaErrorResponse.getHttpCode());
231
232         switch (cambriaErrorResponse.getOperationStatus()) {
233         case UNKNOWN_HOST_ERROR:
234             String hostname = cambriaErrorResponse.getVariables().get(0);
235             BeEcompErrorManager.getInstance().logBeUebUnkownHostError(methodName, httpCode);
236             break;
237         case AUTHENTICATION_ERROR:
238             BeEcompErrorManager.getInstance().logBeUebAuthenticationError(methodName, httpCode);
239             break;
240         case CONNNECTION_ERROR:
241             BeEcompErrorManager.getInstance().logBeUebConnectionError(methodName, httpCode);
242             break;
243
244         case INTERNAL_SERVER_ERROR:
245             BeEcompErrorManager.getInstance().logBeUebSystemError(methodName, operationDesc);
246             break;
247         default:
248             break;
249         }
250
251     }
252
253     /**
254      * create a topic if it does not exists in the topicsList
255      *
256      * @param hostSet
257      *            - list of U-EB servers
258      * @param apiKey
259      * @param secretKey
260      * @param topicName
261      *            - topic to create
262      * @param partitionCount
263      * @param replicationCount
264      * @return
265      */
266     public CambriaErrorResponse createTopic(Collection<String> hostSet, String apiKey, String secretKey, String topicName, int partitionCount, int replicationCount) {
267
268         CambriaTopicManager createTopicManager = null;
269         try {
270
271             createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(apiKey, secretKey));
272
273             createTopicManager.createTopic(topicName, "ASDC distribution notification topic", partitionCount, replicationCount);
274
275         } catch (HttpException | IOException | GeneralSecurityException e) {
276
277             logger.debug("Failed to create topic {}", topicName, e);
278             String methodName = new Object() {
279             }.getClass().getEnclosingMethod().getName();
280
281             CambriaErrorResponse cambriaErrorResponse = processError(e);
282
283             if (cambriaErrorResponse.getOperationStatus() != CambriaOperationStatus.TOPIC_ALREADY_EXIST) {
284                 writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "create topic");
285             }
286
287             return cambriaErrorResponse;
288
289         } finally {
290             if (createTopicManager != null) {
291                 createTopicManager.close();
292             }
293         }
294         return new CambriaErrorResponse(CambriaOperationStatus.OK);
295
296     }
297
298     public CambriaErrorResponse unRegisterFromTopic(Collection<String> hostSet, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum, String topicName) {
299         CambriaTopicManager createTopicManager = null;
300         try {
301             createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey));
302
303             if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) {
304                 createTopicManager.revokeProducer(topicName, subscriberApiKey);
305             } else {
306                 createTopicManager.revokeConsumer(topicName, subscriberApiKey);
307             }
308
309         } catch (HttpObjectNotFoundException | GeneralSecurityException e) {
310             logger.debug("Failed to unregister {} from topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString().toLowerCase(), e);
311             String methodName = new Object() {
312             }.getClass().getEnclosingMethod().getName();
313
314             BeEcompErrorManager.getInstance().logBeUebObjectNotFoundError(methodName, e.getMessage());
315
316             CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OBJECT_NOT_FOUND, HttpStatus.SC_NOT_FOUND);
317             return cambriaErrorResponse;
318
319         } catch (HttpException | IOException e) {
320             logger.debug("Failed to unregister {} from topic {} as producer", managerApiKey, topicName, e);
321             String methodName = new Object() {
322             }.getClass().getEnclosingMethod().getName();
323
324             CambriaErrorResponse cambriaErrorResponse = processError(e);
325
326             writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "unregister from topic as " + subscriberTypeEnum.toString().toLowerCase());
327
328             return cambriaErrorResponse;
329         } finally {
330             if (createTopicManager != null) {
331                 createTopicManager.close();
332             }
333         }
334
335         CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK, HttpStatus.SC_OK);
336         return cambriaErrorResponse;
337     }
338
339     /**
340      *
341      * register a public key (subscriberId) to a given topic as a CONSUMER or PRODUCER
342      *
343      * @param hostSet
344      * @param managerApiKey
345      * @param managerSecretKey
346      * @param subscriberApiKey
347      * @param subscriberTypeEnum
348      * @param topicName
349      * @return
350      */
351     public CambriaErrorResponse registerToTopic(Collection<String> hostSet, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum, String topicName) {
352
353         CambriaTopicManager createTopicManager = null;
354         try {
355             createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey));
356
357             if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) {
358                 createTopicManager.allowProducer(topicName, subscriberApiKey);
359             } else {
360                 createTopicManager.allowConsumer(topicName, subscriberApiKey);
361             }
362
363         } catch (HttpObjectNotFoundException | GeneralSecurityException e) {
364             logger.debug("Failed to register {} to topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString().toLowerCase(), e);
365             String methodName = new Object() {
366             }.getClass().getEnclosingMethod().getName();
367
368             BeEcompErrorManager.getInstance().logBeUebObjectNotFoundError(methodName, e.getMessage());
369
370             CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OBJECT_NOT_FOUND, HttpStatus.SC_NOT_FOUND);
371             return cambriaErrorResponse;
372
373         } catch (HttpException | IOException e) {
374             logger.debug("Failed to register {} to topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString().toLowerCase(), e);
375             String methodName = new Object() {
376             }.getClass().getEnclosingMethod().getName();
377
378             CambriaErrorResponse cambriaErrorResponse = processError(e);
379
380             writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "register to topic as " + subscriberTypeEnum.toString().toLowerCase());
381
382             return cambriaErrorResponse;
383         } finally {
384             if (createTopicManager != null) {
385                 createTopicManager.close();
386             }
387         }
388
389         CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK, HttpStatus.SC_OK);
390         return cambriaErrorResponse;
391     }
392
393     /**
394      * create and retrieve a Cambria Consumer for a specific topic
395      *
396      * @param hostSet
397      * @param topicName
398      * @param apiKey
399      * @param secretKey
400      * @param consumerId
401      * @param consumerGroup
402      * @param timeoutMS
403      * @return
404      * @throws Exception
405      */
406     public CambriaConsumer createConsumer(Collection<String> hostSet, String topicName, String apiKey, String secretKey, String consumerId, String consumerGroup, int timeoutMS) throws Exception {
407
408         CambriaConsumer consumer = new ConsumerBuilder().authenticatedBy(apiKey, secretKey).knownAs(consumerGroup, consumerId).onTopic(topicName).usingHosts(hostSet).waitAtServer(timeoutMS).build();
409         consumer.setApiCredentials(apiKey, secretKey);
410         return consumer;
411     }
412
413     public void closeConsumer(CambriaConsumer consumer) {
414
415         if (consumer != null) {
416             consumer.close();
417         }
418
419     }
420
421     /**
422      * use the topicConsumer to fetch messages from topic. in case no messages were fetched, empty ArrayList will be returned (not error)
423      *
424      * @param topicConsumer
425      * @return
426      */
427     public Either<Iterable<String>, CambriaErrorResponse> fetchFromTopic(CambriaConsumer topicConsumer) {
428
429         try {
430             Iterable<String> messages = topicConsumer.fetch();
431             if (messages == null) {
432                 messages = new ArrayList<String>();
433             }
434             return Either.left(messages);
435
436         } catch (IOException e) {
437             String methodName = new Object() {
438             }.getClass().getEnclosingMethod().getName();
439
440             CambriaErrorResponse cambriaErrorResponse = processError(e);
441
442             logger.debug("Failed to fetch from U-EB topic. error={}", e.getMessage());
443             writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "get messages from topic");
444
445             return Either.right(cambriaErrorResponse);
446
447         } catch (Exception e) {
448             logger.debug("Failed to fetch from U-EB topic", e);
449             String methodName = new Object() {
450             }.getClass().getEnclosingMethod().getName();
451
452             BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
453
454             CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, HttpStatus.SC_INTERNAL_SERVER_ERROR);
455             return Either.right(cambriaErrorResponse);
456         }
457     }
458
459     /**
460      * Publish notification message to a given queue
461      *
462      * @param topicName
463      * @param uebPublicKey
464      * @param uebSecretKey
465      * @param uebServers
466      * @param data
467      * @return
468      */
469     public CambriaErrorResponse sendNotification(String topicName, String uebPublicKey, String uebSecretKey, List<String> uebServers, INotificationData data) {
470
471         CambriaBatchingPublisher createSimplePublisher = null;
472
473         try {
474
475             String json = gson.toJson(data);
476             logger.trace("Before sending notification data {} to topic {}", json, topicName);
477
478             createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHosts(uebServers).build();
479             createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey);
480
481             int result = createSimplePublisher.send(PARTITION_KEY, json);
482
483             try {
484                 Thread.sleep(1 * 1000);
485             } catch (InterruptedException e) {
486                 logger.debug("Failed during sleep after sending the message.", e);
487             }
488
489             logger.debug("After sending notification data to topic {}. result is {}", topicName, result);
490
491             CambriaErrorResponse response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
492
493             return response;
494
495         } catch (IOException | GeneralSecurityException e) {
496             logger.debug("Failed to send notification {} to topic {} ", data, topicName, e);
497
498             String methodName = new Object() {
499             }.getClass().getEnclosingMethod().getName();
500
501             CambriaErrorResponse cambriaErrorResponse = processError(e);
502
503             writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, SEND_NOTIFICATION);
504
505             return cambriaErrorResponse;
506         } finally {
507             if (createSimplePublisher != null) {
508                 logger.debug("Before closing publisher");
509                 createSimplePublisher.close();
510                 logger.debug("After closing publisher");
511             }
512         }
513     }
514
515     public CambriaErrorResponse sendNotificationAndClose(String topicName, String uebPublicKey, String uebSecretKey, List<String> uebServers, INotificationData data, long waitBeforeCloseTimeout) {
516
517         CambriaBatchingPublisher createSimplePublisher = null;
518
519         CambriaErrorResponse response = null;
520         try {
521
522             String json = gson.toJson(data);
523             logger.debug("Before sending notification data {} to topic {}", json, topicName);
524
525             createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHosts(uebServers).build();
526             createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey);
527
528             int result = createSimplePublisher.send(PARTITION_KEY, json);
529
530             try {
531                 Thread.sleep(1000);
532             } catch (InterruptedException e) {
533                 logger.debug("Failed during sleep after sending the message.", e);
534             }
535
536             logger.debug("After sending notification data to topic {}. result is {}", topicName, result);
537
538         } catch (IOException | GeneralSecurityException  e) {
539             logger.debug("Failed to send notification {} to topic {} ", data, topicName, e);
540
541             String methodName = new Object() {
542             }.getClass().getEnclosingMethod().getName();
543
544             response = processError(e);
545
546             writeErrorToLog(response, e.getMessage(), methodName, SEND_NOTIFICATION);
547
548             return response;
549
550         }
551
552         logger.debug("Before closing publisher. Maximum timeout is {} seconds", waitBeforeCloseTimeout);
553         try {
554             List<message> messagesInQ = createSimplePublisher.close(waitBeforeCloseTimeout, TimeUnit.SECONDS);
555             if (messagesInQ != null && false == messagesInQ.isEmpty()) {
556                 logger.debug("Cambria client returned {} non sent messages.", messagesInQ.size());
557                 response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
558                 String methodName = new Object() {
559                 }.getClass().getEnclosingMethod().getName();
560                 writeErrorToLog(response, "closing publisher returned non sent messages", methodName, SEND_NOTIFICATION);
561             } else {
562                 logger.debug("No message left in the queue after closing cambria publisher");
563                 response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
564             }
565         } catch (IOException | InterruptedException e) {
566             logger.debug("Failed to close cambria publisher", e);
567             response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
568             String methodName = new Object() {
569             }.getClass().getEnclosingMethod().getName();
570             writeErrorToLog(response, "closing publisher returned non sent messages", methodName, SEND_NOTIFICATION);
571         }
572         logger.debug("After closing publisher");
573
574         return response;
575
576     }
577
578     public CambriaErrorResponse getApiKey(String server, String apiKey) {
579
580         CambriaErrorResponse response = null;
581
582         List<String> hostSet = new ArrayList<>();
583         hostSet.add(server);
584         try {
585             CambriaIdentityManager createIdentityManager = buildCambriaClient(new IdentityManagerBuilder().usingHosts(hostSet));
586             createIdentityManager.getApiKey(apiKey);
587
588             response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
589
590         } catch (HttpException | IOException | CambriaApiException | GeneralSecurityException e) {
591             logger.debug("Failed to fetch api key {} from server {}", apiKey, server, e);
592
593             response = processError(e);
594
595         }
596
597         return response;
598     }
599
600     public Either<ApiCredential, CambriaErrorResponse> createUebKeys(List<String> hostSet) {
601         Either<ApiCredential, CambriaErrorResponse> result;
602
603         try {
604             CambriaIdentityManager createIdentityManager = buildCambriaClient(new IdentityManagerBuilder().usingHosts(hostSet));
605
606             String description = String.format("ASDC Key for %s", CONSUMER_ID);
607             ApiCredential credential = createIdentityManager.createApiKey("", description);
608             createIdentityManager.setApiCredentials(credential.getApiKey(), credential.getApiSecret());
609             result = Either.left(credential);
610
611         } catch (Exception e) {
612             logger.debug("Failed to create ueb keys for servers {}",hostSet, e);
613
614             result = Either.right(processError(e));
615
616         }
617
618         return result;
619     }
620
621     @VisibleForTesting
622     <T extends CambriaClient> T buildCambriaClient(CambriaClientBuilders.AbstractAuthenticatedManagerBuilder<? extends CambriaClient> client) throws MalformedURLException, GeneralSecurityException {
623         return (T)client.build();
624     }
625 }