Merge "Add proxy support"
[sdc.git] / catalog-be / src / main / java / org / openecomp / sdc / be / components / distribution / engine / CambriaHandler.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.be.components.distribution.engine;
22
23 import java.io.IOException;
24 import java.net.MalformedURLException;
25 import java.security.GeneralSecurityException;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.Iterator;
29 import java.util.List;
30 import java.util.Set;
31 import java.util.concurrent.TimeUnit;
32 import java.util.regex.Matcher;
33 import java.util.regex.Pattern;
34
35 import com.att.nsa.cambria.client.*;
36 import org.apache.http.HttpStatus;
37 import org.openecomp.sdc.be.config.BeEcompErrorManager;
38 import org.openecomp.sdc.be.config.ConfigurationManager;
39 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
40 import org.openecomp.sdc.common.config.EcompErrorName;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 import com.att.nsa.apiClient.http.HttpException;
45 import com.att.nsa.apiClient.http.HttpObjectNotFoundException;
46 import com.att.nsa.cambria.client.CambriaClient.CambriaApiException;
47 import com.att.nsa.cambria.client.CambriaClientBuilders.TopicManagerBuilder;
48 import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
49 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
50 import com.att.nsa.cambria.client.CambriaClientBuilders.IdentityManagerBuilder;
51 import com.att.nsa.cambria.client.CambriaPublisher.message;
52 import com.google.gson.Gson;
53
54 import fj.data.Either;
55 import jline.internal.Log;
56
57 public class CambriaHandler {
58
59         private static Logger logger = LoggerFactory.getLogger(CambriaHandler.class.getName());
60
61         public static String PARTITION_KEY = "asdc" + "aa";
62
63         private Gson gson = new Gson();
64
65         public static boolean useHttpsWithDmaap = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration().isUseHttpsWithDmaap();
66
67         public static void main(String[] args) {
68
69                 // String userBodyJson ="{\"artifactName\":\"myartifact\",
70                 // \"artifactType\":\"MURANO-PKG\",
71                 // \"artifactDescription\":\"description\",
72                 // \"payloadData\":\"UEsDBAoAAAAIAAeLb0bDQz\", \"Content-MD5\":
73                 // \"YTg2Mjg4MWJhNmI5NzBiNzdDFkMWI=\" }";
74                 // System.out.println(userBodyJson);
75                 // String encodeBase64Str = GeneralUtililty.calculateMD5 (userBodyJson);
76                 // System.out.println(encodeBase64Str);
77
78                 CambriaTopicManager createTopicManager = null;
79                 try {
80                         List<String> servers = new ArrayList<String>();
81                         // servers.add("uebsb91kcdc.it.sdc.com:3904");
82                         // servers.add("uebsb92kcdc.it.sdc.com:3904");
83                         // servers.add("uebsb93kcdc.it.sdc.com:3904");
84                         servers.add("uebsb91sfdc.it.att.com:3904");
85                         servers.add("uebsb92sfdc.it.att.com:3904");
86
87                         String key = "sSJc5qiBnKy2qrlc";
88                         String secret = "4ZRPzNJfEUK0sSNBvccd2m7X";
89
90                         createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(servers).authenticatedBy(key, secret));
91
92                         String topicName = "ASDC-DISTR-NOTIF-TOPIC-PRODesofer";
93
94                         String clientKey1 = "CGGoorrGPXPx2B1C";
95                         String clientSecret1 = "OTHk2mcCSbskEtHhDw8h5oUa";
96
97                         CambriaTopicManager createStatusTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(servers).authenticatedBy(key, secret));
98                         String reportTopic = "ASDC-DISTR-STATUS-TOPIC-PRODESOFER";
99                         createStatusTopicManager.allowProducer(reportTopic, clientKey1);
100
101                         CambriaBatchingPublisher createSimplePublisher = new PublisherBuilder().onTopic(reportTopic).usingHttps(useHttpsWithDmaap).usingHosts(servers).build();
102                         createSimplePublisher.setApiCredentials(clientKey1, clientSecret1);
103
104                         DistributionStatusNotification distributionStatusNotification = new DistributionStatusNotification();
105                         distributionStatusNotification.setStatus(DistributionStatusNotificationEnum.DEPLOY_OK);
106                         distributionStatusNotification.setArtifactURL("Ssssssss url");
107                         distributionStatusNotification.setDistributionID("idddddddddddddd");
108                         distributionStatusNotification.setTimestamp(System.currentTimeMillis());
109                         distributionStatusNotification.setConsumerID("my consumer id");
110
111                         Gson gson = new Gson();
112                         int result = createSimplePublisher.send(PARTITION_KEY, gson.toJson(distributionStatusNotification));
113
114                         List<message> messagesInQ = createSimplePublisher.close(20, TimeUnit.SECONDS);
115                         System.out.println(messagesInQ == null ? 0 : messagesInQ.size());
116
117                         // createTopicManager.createTopic(topicName, "my test topic", 1, 1);
118
119                         /*
120                          * 
121                          * { "secret": "OTHk2mcCSbskEtHhDw8h5oUa", "aux": { "email": "esofer@intl.sdc.com", "description": "test-keys" }, "key": "CGGoorrGPXPx2B1C" }
122                          * 
123                          * 
124                          * { "secret": "FSlNJbmGWWBvBLJetQMYxPP6", "aux": { "email": "esofer@intl.sdc.com", "description": "test-keys" }, "key": "TAIEPO0aDU4VzM0G" }
125                          * 
126                          */
127
128                         String clientKey2 = "TAIEPO0aDU4VzM0G";
129
130                         CambriaConsumer createConsumer1 = new ConsumerBuilder().authenticatedBy("asdc1", "consumerId1").onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(servers).build();
131                         createConsumer1.setApiCredentials(clientKey1, "OTHk2mcCSbskEtHhDw8h5oUa");
132
133                         createTopicManager.allowConsumer(topicName, clientKey1);
134
135                         CambriaConsumer createConsumer2 = null;
136                         if (true) {
137                                 createConsumer2 = new ConsumerBuilder().authenticatedBy("asdc2", "consumerId3").onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(servers).build();
138                                 createConsumer2.setApiCredentials(clientKey2, "FSlNJbmGWWBvBLJetQMYxPP6");
139
140                                 createTopicManager.allowConsumer(topicName, clientKey2);
141                         }
142
143                         createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(servers).build();
144                         createSimplePublisher.setApiCredentials(key, secret);
145                         createTopicManager.allowProducer(topicName, key);
146
147                         createSimplePublisher.send("aaaa", "{ my testttttttttttttttt }");
148
149                         while (true) {
150
151                                 Iterable<String> fetch1 = createConsumer1.fetch();
152
153                                 Iterator<String> iterator1 = fetch1.iterator();
154                                 while (iterator1.hasNext()) {
155                                         System.out.println("***********************************************");
156                                         System.out.println("client 1" + iterator1.next());
157                                         System.out.println("***********************************************");
158                                 }
159
160                                 if (createConsumer2 != null) {
161                                         Iterable<String> fetch2 = createConsumer2.fetch();
162
163                                         Iterator<String> iterator2 = fetch2.iterator();
164                                         while (iterator2.hasNext()) {
165                                                 System.out.println("***********************************************");
166                                                 System.out.println("client 2" + iterator2.next());
167                                                 System.out.println("***********************************************");
168                                         }
169                                 }
170                                 Thread.sleep(1000 * 20);
171                         }
172
173                         // createTopicManager = CambriaClientFactory.createTopicManager(
174                         // servers, "8F3MDAtMSBwwpSMy", "gzFmsTxSCtO5RQfAccM6PqqX");
175
176                         // createTopicManager.deleteTopic("ASDC-DISTR-NOTIF-TOPIC-PROD");
177                         // createTopicManager.deleteTopic("ASDC-DISTR-NOTIF-TOPIC-PROD1");
178
179                         // CambriaIdentityManager createIdentityManager =
180                         // CambriaClientFactory.createIdentityManager(null, null, null);
181                         // createIdentityManager.setApiCredentials(arg0, arg1);
182                         // createIdentityManager.cl
183
184                         // String topicName = " ";
185                         // createTopicManager.createTopic(topicName,
186                         // "ASDC distribution notification topic", 1, 1);
187                         //
188                         // Thread.sleep(10 * 1000);
189                         //
190                         // for (int i = 0; i < 5; i++) {
191                         // try {
192                         // boolean openForProducing = createTopicManager
193                         // .isOpenForProducing(topicName);
194                         //
195                         // System.out.println("openForProducing=" + openForProducing);
196                         // createTopicManager.allowProducer(topicName,
197                         // "8F3MDAtMSBwwpSMy");
198                         // Set<String> allowedProducers = createTopicManager
199                         // .getAllowedProducers(topicName);
200                         // System.out.println(allowedProducers);
201                         //
202                         // } catch (Exception e) {
203                         // e.printStackTrace();
204                         // }
205                         // }
206
207                         // createTopicManager.createTopic("", "", 0, 0);
208                         // createTopicManager.allowProducer(arg0, arg1);
209                         // createTopicManager.getTopics();
210                         // createTopicManager.close();
211                         // CambriaClientFactory.
212                         // CambriaBatchingPublisher createSimplePublisher =
213                         // CambriaClientFactory.createSimplePublisher("hostlist", "topic");
214
215                         // CambriaIdentityManager createIdentityManager =
216                         // CambriaClientFactory.createIdentityManager(null, "apiKey",
217                         // "apiSecret");
218                         // createIdentityManager.
219
220                 } catch (Exception e) {
221                         Log.debug("Exception in main test of Cambria Handler: {}", e.getMessage(), e);
222                         e.printStackTrace();
223                 } finally {
224                         if (createTopicManager != null) {
225                                 createTopicManager.close();
226                         }
227                 }
228         }
229
230         /**
231          * process the response error from Cambria client
232          * 
233          * @param message
234          * @return
235          */
236         private Integer processMessageException(String message) {
237
238                 String[] patterns = { "(HTTP Status )(\\d\\d\\d)", "(HTTP/\\d.\\d )(\\d\\d\\d)" };
239
240                 Integer result = checkPattern(patterns[0], message, 2);
241                 if (result != null) {
242                         return result;
243                 }
244                 result = checkPattern(patterns[1], message, 2);
245
246                 return result;
247
248         }
249
250         /**
251          * check whether the message has a match with a given pattern inside it
252          * 
253          * @param patternStr
254          * @param message
255          * @param groupIndex
256          * @return
257          */
258         private Integer checkPattern(String patternStr, String message, int groupIndex) {
259                 Integer result = null;
260
261                 Pattern pattern = Pattern.compile(patternStr);
262                 Matcher matcher = pattern.matcher(message);
263                 boolean find = matcher.find();
264                 if (find) {
265                         String httpCode = matcher.group(groupIndex);
266                         if (httpCode != null) {
267                                 try {
268                                         result = Integer.valueOf(httpCode);
269                                 } catch (NumberFormatException e) {
270                                         logger.debug("Failed to parse http code {}", httpCode);
271                                 }
272                         }
273                 }
274                 return result;
275         }
276
277         /**
278          * retrieve all topics from U-EB server
279          * 
280          * @param hostSet
281          * @return
282          */
283         public Either<Set<String>, CambriaErrorResponse> getTopics(List<String> hostSet) {
284
285                 CambriaTopicManager createTopicManager = null;
286                 try {
287
288                         createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet));
289
290                         Set<String> topics = createTopicManager.getTopics();
291
292                         if (topics == null || true == topics.isEmpty()) {
293                                 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.NOT_FOUND, null);
294                                 return Either.right(cambriaErrorResponse);
295                         }
296
297                         return Either.left(topics);
298
299                 } catch (IOException | GeneralSecurityException e) {
300                         String methodName = new Object() {
301                         }.getClass().getEnclosingMethod().getName();
302
303                         CambriaErrorResponse cambriaErrorResponse = processError(e);
304
305                         logger.debug("Failed to fetch topics from U-EB server", e);
306                         writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "get topics");
307
308                         return Either.right(cambriaErrorResponse);
309                 } finally {
310                         if (createTopicManager != null) {
311                                 createTopicManager.close();
312                         }
313                 }
314
315         }
316
317         /**
318          * process the error message from Cambria client.
319          * 
320          * set Cambria status and http code in case we succeed to fetch it
321          * 
322          * @return
323          */
324         private CambriaErrorResponse processError(Exception e) {
325
326                 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse();
327
328                 Integer httpCode = processMessageException(e.getMessage());
329
330                 if (httpCode != null) {
331                         cambriaErrorResponse.setHttpCode(httpCode);
332                         switch (httpCode.intValue()) {
333
334                         case 401:
335                                 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.AUTHENTICATION_ERROR);
336                                 break;
337                         case 409:
338                                 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.TOPIC_ALREADY_EXIST);
339                                 break;
340                         case 500:
341                                 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.INTERNAL_SERVER_ERROR);
342                                 break;
343                         default:
344                                 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.CONNNECTION_ERROR);
345                         }
346                 } else {
347
348                         boolean found = false;
349                         Throwable throwable = e.getCause();
350                         if (throwable != null) {
351                                 String message = throwable.getMessage();
352
353                                 Throwable cause = throwable.getCause();
354
355                                 if (cause != null) {
356                                         Class<?> clazz = cause.getClass();
357                                         String className = clazz.getName();
358                                         if (className.endsWith("UnknownHostException")) {
359                                                 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.UNKNOWN_HOST_ERROR);
360                                                 cambriaErrorResponse.addVariable(message);
361                                                 found = true;
362                                         }
363                                 }
364                         }
365
366                         if (false == found) {
367                                 cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.CONNNECTION_ERROR);
368                                 cambriaErrorResponse.setHttpCode(HttpStatus.SC_INTERNAL_SERVER_ERROR);
369                         }
370                 }
371
372                 return cambriaErrorResponse;
373         }
374
375         /**
376          * write the error to the log
377          * 
378          * @param cambriaErrorResponse
379          * @param errorMessage
380          * @param methodName
381          * @param operationDesc
382          */
383         private void writeErrorToLog(CambriaErrorResponse cambriaErrorResponse, String errorMessage, String methodName, String operationDesc) {
384
385                 String httpCode = (cambriaErrorResponse.getHttpCode() == null ? "" : String.valueOf(cambriaErrorResponse.getHttpCode()));
386
387                 switch (cambriaErrorResponse.getOperationStatus()) {
388                 case UNKNOWN_HOST_ERROR:
389                         String hostname = cambriaErrorResponse.getVariables().get(0);
390                         BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebUnkownHostError, methodName, hostname);
391                         BeEcompErrorManager.getInstance().logBeUebUnkownHostError(methodName, httpCode);
392                         break;
393                 case AUTHENTICATION_ERROR:
394                         BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebAuthenticationError, methodName, httpCode);
395                         BeEcompErrorManager.getInstance().logBeUebAuthenticationError(methodName, httpCode);
396                         break;
397                 case CONNNECTION_ERROR:
398                         BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebConnectionError, methodName, httpCode);
399                         BeEcompErrorManager.getInstance().logBeUebConnectionError(methodName, httpCode);
400                         break;
401
402                 case INTERNAL_SERVER_ERROR:
403                         BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, methodName, operationDesc);
404                         BeEcompErrorManager.getInstance().logBeUebSystemError(methodName, operationDesc);
405                         break;
406
407                 }
408
409         }
410
411         /**
412          * create a topic if it does not exists in the topicsList
413          * 
414          * @param hostSet
415          *            - list of U-EB servers
416          * @param apiKey
417          * @param secretKey
418          * @param topicName
419          *            - topic to create
420          * @param partitionCount
421          * @param replicationCount
422          * @return
423          */
424         public CambriaErrorResponse createTopic(Collection<String> hostSet, String apiKey, String secretKey, String topicName, int partitionCount, int replicationCount) {
425
426                 CambriaTopicManager createTopicManager = null;
427                 try {
428
429                         createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(apiKey, secretKey));
430
431                         createTopicManager.createTopic(topicName, "ASDC distribution notification topic", partitionCount, replicationCount);
432
433                 } catch (HttpException | IOException | GeneralSecurityException e) {
434
435                         logger.debug("Failed to create topic {}", topicName, e);
436                         String methodName = new Object() {
437                         }.getClass().getEnclosingMethod().getName();
438
439                         CambriaErrorResponse cambriaErrorResponse = processError(e);
440
441                         if (cambriaErrorResponse.getOperationStatus() != CambriaOperationStatus.TOPIC_ALREADY_EXIST) {
442                                 writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "create topic");
443                         }
444
445                         return cambriaErrorResponse;
446
447                 } finally {
448                         if (createTopicManager != null) {
449                                 createTopicManager.close();
450                         }
451                 }
452                 return new CambriaErrorResponse(CambriaOperationStatus.OK);
453
454         }
455
456         public CambriaErrorResponse unRegisterFromTopic(Collection<String> hostSet, String topicName, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum) {
457                 CambriaTopicManager createTopicManager = null;
458                 try {
459                         createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey));
460
461                         if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) {
462                                 createTopicManager.revokeProducer(topicName, subscriberApiKey);
463                         } else {
464                                 createTopicManager.revokeConsumer(topicName, subscriberApiKey);
465                         }
466
467                 } catch (HttpObjectNotFoundException | GeneralSecurityException e) {
468                         logger.debug("Failed to unregister {} from topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString().toLowerCase(), e);
469                         String methodName = new Object() {
470                         }.getClass().getEnclosingMethod().getName();
471
472                         BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebObjectNotFoundError, methodName, e.getMessage());
473                         BeEcompErrorManager.getInstance().logBeUebObjectNotFoundError(methodName, e.getMessage());
474
475                         CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OBJECT_NOT_FOUND, HttpStatus.SC_NOT_FOUND);
476                         return cambriaErrorResponse;
477
478                 } catch (HttpException | IOException e) {
479                         logger.debug("Failed to unregister {} from topic {} as producer", managerApiKey, topicName, e);
480                         String methodName = new Object() {
481                         }.getClass().getEnclosingMethod().getName();
482
483                         CambriaErrorResponse cambriaErrorResponse = processError(e);
484
485                         writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "unregister from topic as " + subscriberTypeEnum.toString().toLowerCase());
486
487                         return cambriaErrorResponse;
488                 } finally {
489                         if (createTopicManager != null) {
490                                 createTopicManager.close();
491                         }
492                 }
493
494                 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK, HttpStatus.SC_OK);
495                 return cambriaErrorResponse;
496         }
497
498         /**
499          * 
500          * register a public key (subscriberId) to a given topic as a CONSUMER or PRODUCER
501          * 
502          * @param hostSet
503          * @param topicName
504          * @param managerApiKey
505          * @param managerSecretKey
506          * @param subscriberApiKey
507          * @param subscriberTypeEnum
508          * @return
509          */
510         public CambriaErrorResponse registerToTopic(Collection<String> hostSet, String topicName, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum) {
511
512                 CambriaTopicManager createTopicManager = null;
513                 try {
514                         createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey));
515
516                         if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) {
517                                 createTopicManager.allowProducer(topicName, subscriberApiKey);
518                         } else {
519                                 createTopicManager.allowConsumer(topicName, subscriberApiKey);
520                         }
521
522                 } catch (HttpObjectNotFoundException | GeneralSecurityException e) {
523                         logger.debug("Failed to register {} to topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString().toLowerCase(), e);
524                         String methodName = new Object() {
525                         }.getClass().getEnclosingMethod().getName();
526
527                         BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebObjectNotFoundError, methodName, e.getMessage());
528                         BeEcompErrorManager.getInstance().logBeUebObjectNotFoundError(methodName, e.getMessage());
529
530                         CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OBJECT_NOT_FOUND, HttpStatus.SC_NOT_FOUND);
531                         return cambriaErrorResponse;
532
533                 } catch (HttpException | IOException e) {
534                         logger.debug("Failed to register {} to topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString().toLowerCase(), e);
535                         String methodName = new Object() {
536                         }.getClass().getEnclosingMethod().getName();
537
538                         CambriaErrorResponse cambriaErrorResponse = processError(e);
539
540                         writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "register to topic as " + subscriberTypeEnum.toString().toLowerCase());
541
542                         return cambriaErrorResponse;
543                 } finally {
544                         if (createTopicManager != null) {
545                                 createTopicManager.close();
546                         }
547                 }
548
549                 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK, HttpStatus.SC_OK);
550                 return cambriaErrorResponse;
551         }
552
553         /**
554          * create and retrieve a Cambria Consumer for a specific topic
555          * 
556          * @param hostSet
557          * @param topicName
558          * @param apiKey
559          * @param secretKey
560          * @param consumerId
561          * @param consumerGroup
562          * @param timeoutMS
563          * @return
564          * @throws Exception 
565          */
566         public CambriaConsumer createConsumer(Collection<String> hostSet, String topicName, String apiKey, String secretKey, String consumerId, String consumerGroup, int timeoutMS) throws Exception {
567
568                 CambriaConsumer consumer = new ConsumerBuilder().authenticatedBy(apiKey, secretKey).knownAs(consumerGroup, consumerId).onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(hostSet).withSocketTimeout(timeoutMS).build();
569                 consumer.setApiCredentials(apiKey, secretKey);
570                 return consumer;
571         }
572
573         public void closeConsumer(CambriaConsumer consumer) {
574
575                 if (consumer != null) {
576                         consumer.close();
577                 }
578
579         }
580
581         /**
582          * use the topicConsumer to fetch messages from topic. in case no messages were fetched, empty ArrayList will be returned (not error)
583          * 
584          * @param topicConsumer
585          * @return
586          */
587         public Either<Iterable<String>, CambriaErrorResponse> fetchFromTopic(CambriaConsumer topicConsumer) {
588
589                 try {
590                         Iterable<String> messages = topicConsumer.fetch();
591                         if (messages == null) {
592                                 messages = new ArrayList<String>();
593                         }
594                         return Either.left(messages);
595
596                 } catch (IOException e) {
597                         String methodName = new Object() {
598                         }.getClass().getEnclosingMethod().getName();
599
600                         CambriaErrorResponse cambriaErrorResponse = processError(e);
601
602                         logger.debug("Failed to fetch from U-EB topic. error={}", e.getMessage());
603                         writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "get messages from topic");
604
605                         return Either.right(cambriaErrorResponse);
606
607                 } catch (Exception e) {
608                         logger.debug("Failed to fetch from U-EB topic", e);
609                         String methodName = new Object() {
610                         }.getClass().getEnclosingMethod().getName();
611
612                         BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeDistributionEngineSystemError, methodName, e.getMessage());
613                         BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
614
615                         CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, HttpStatus.SC_INTERNAL_SERVER_ERROR);
616                         return Either.right(cambriaErrorResponse);
617                 }
618         }
619
620         /**
621          * Publish notification message to a given queue
622          * 
623          * @param topicName
624          * @param uebPublicKey
625          * @param uebSecretKey
626          * @param uebServers
627          * @param data
628          * @return
629          */
630         public CambriaErrorResponse sendNotification(String topicName, String uebPublicKey, String uebSecretKey, List<String> uebServers, INotificationData data) {
631
632                 CambriaBatchingPublisher createSimplePublisher = null;
633
634                 try {
635
636                         String json = gson.toJson(data);
637                         logger.trace("Before sending notification data {} to topic {}", json, topicName);
638
639                         createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(uebServers).build();
640                         createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey);
641
642                         int result = createSimplePublisher.send(PARTITION_KEY, json);
643
644                         try {
645                                 Thread.sleep(1 * 1000);
646                         } catch (InterruptedException e) {
647                                 logger.debug("Failed during sleep after sending the message.", e);
648                         }
649
650                         logger.debug("After sending notification data to topic {}. result is {}", topicName, result);
651
652                         CambriaErrorResponse response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
653
654                         return response;
655
656                 } catch (IOException | GeneralSecurityException e) {
657                         logger.debug("Failed to send notification {} to topic {} ", data, topicName, e);
658
659                         String methodName = new Object() {
660                         }.getClass().getEnclosingMethod().getName();
661
662                         CambriaErrorResponse cambriaErrorResponse = processError(e);
663
664                         writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "send notification");
665
666                         return cambriaErrorResponse;
667                 } finally {
668                         if (createSimplePublisher != null) {
669                                 logger.debug("Before closing publisher");
670                                 createSimplePublisher.close();
671                                 logger.debug("After closing publisher");
672                         }
673                 }
674         }
675
676         private String convertListToString(List<String> list) {
677                 StringBuilder builder = new StringBuilder();
678
679                 if (list != null) {
680                         for (int i = 0; i < list.size(); i++) {
681                                 builder.append(list.get(i));
682                                 if (i < list.size() - 1) {
683                                         builder.append(",");
684                                 }
685                         }
686                 }
687
688                 return builder.toString();
689         }
690
691         public CambriaErrorResponse sendNotificationAndClose(String topicName, String uebPublicKey, String uebSecretKey, List<String> uebServers, INotificationData data, long waitBeforeCloseTimeout) {
692
693                 CambriaBatchingPublisher createSimplePublisher = null;
694
695                 CambriaErrorResponse response = null;
696                 try {
697
698                         String json = gson.toJson(data);
699                         logger.debug("Before sending notification data {} to topic {}", json, topicName);
700
701                         createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(uebServers).build();
702                         createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey);
703
704                         int result = createSimplePublisher.send(PARTITION_KEY, json);
705
706                         try {
707                                 Thread.sleep(1000);
708                         } catch (InterruptedException e) {
709                                 logger.debug("Failed during sleep after sending the message.", e);
710                         }
711
712                         logger.debug("After sending notification data to topic {}. result is {}", topicName, result);
713
714                 } catch (IOException | GeneralSecurityException e) {
715                         logger.debug("Failed to send notification {} to topic {} ", data, topicName, e);
716
717                         String methodName = new Object() {
718                         }.getClass().getEnclosingMethod().getName();
719
720                         response = processError(e);
721
722                         writeErrorToLog(response, e.getMessage(), methodName, "send notification");
723
724                         return response;
725
726                 }
727
728                 logger.debug("Before closing publisher. Maximum timeout is {} seconds", waitBeforeCloseTimeout);
729                 try {
730                         List<message> messagesInQ = createSimplePublisher.close(waitBeforeCloseTimeout, TimeUnit.SECONDS);
731                         if (messagesInQ != null && false == messagesInQ.isEmpty()) {
732                                 logger.debug("Cambria client returned {} non sent messages.", messagesInQ.size());
733                                 response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
734                                 String methodName = new Object() {
735                                 }.getClass().getEnclosingMethod().getName();
736                                 writeErrorToLog(response, "closing publisher returned non sent messages", methodName, "send notification");
737                         } else {
738                                 logger.debug("No message left in the queue after closing cambria publisher");
739                                 response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
740                         }
741                 } catch (IOException | InterruptedException e) {
742                         logger.debug("Failed to close cambria publisher", e);
743                         response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
744                         String methodName = new Object() {
745                         }.getClass().getEnclosingMethod().getName();
746                         writeErrorToLog(response, "closing publisher returned non sent messages", methodName, "send notification");
747                 }
748                 logger.debug("After closing publisher");
749
750                 return response;
751
752         }
753
754         public CambriaErrorResponse getApiKey(String server, String apiKey) {
755
756                 CambriaErrorResponse response = null;
757
758                 List<String> hostSet = new ArrayList<>();
759                 hostSet.add(server);
760                 CambriaIdentityManager createIdentityManager = null;
761                 try {
762                         createIdentityManager = buildCambriaClient(new IdentityManagerBuilder().usingHosts(hostSet));
763                         createIdentityManager.getApiKey(apiKey);
764                         response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
765
766                 } catch (HttpException | IOException | CambriaApiException | GeneralSecurityException e) {
767                         logger.debug("Failed to fetch api key {} from server ", apiKey, server, e);
768
769                         response = processError(e);
770
771                 }
772
773                 return response;
774         }
775
776         private static <T extends CambriaClient> T buildCambriaClient(CambriaClientBuilders.AbstractAuthenticatedManagerBuilder<? extends CambriaClient> client) throws MalformedURLException, GeneralSecurityException {
777                 if (useHttpsWithDmaap) {
778                         client.usingHttps();
779                 }
780                 return (T)client.build();
781         }
782 }