1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 Wipro Limited.
6 * ==============================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 *******************************************************************************/
22 package org.onap.slice.analysis.ms.utils;
24 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
25 import com.att.nsa.cambria.client.CambriaClient;
26 import com.att.nsa.cambria.client.CambriaClientBuilders;
27 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
28 import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
29 import com.att.nsa.cambria.client.CambriaClientBuilders.TopicManagerBuilder;
30 import com.att.nsa.cambria.client.CambriaConsumer;
31 import com.att.nsa.cambria.client.CambriaTopicManager;
33 import java.net.MalformedURLException;
34 import java.security.GeneralSecurityException;
36 import org.onap.slice.analysis.ms.models.Configuration;
39 * Utility class to perform actions related to Dmaap
41 public class DmaapUtils {
46 public CambriaBatchingPublisher buildPublisher(Configuration config, String topic) {
48 return builder(config, topic).build();
49 } catch (MalformedURLException | GeneralSecurityException e) {
58 public CambriaConsumer buildConsumer(Configuration config, String topic) {
61 return builderConsumer(config, topic).build();
62 } catch (MalformedURLException | GeneralSecurityException e) {
68 private static PublisherBuilder builder(Configuration config, String topic) {
69 if (config.isSecured()) {
70 return authenticatedBuilder(config, topic);
72 return unAuthenticatedBuilder(config, topic);
76 private static PublisherBuilder authenticatedBuilder(Configuration config, String topic) {
77 return unAuthenticatedBuilder(config, topic).usingHttps().authenticatedByHttp(config.getAafUsername(),
78 config.getAafPassword());
81 private static PublisherBuilder unAuthenticatedBuilder(Configuration config, String topic) {
82 return new CambriaClientBuilders.PublisherBuilder().usingHosts(config.getDmaapServers()).onTopic(topic)
83 .logSendFailuresAfter(5);
86 private static ConsumerBuilder builderConsumer(Configuration config, String topic) {
87 if (config.isSecured()) {
88 return authenticatedConsumerBuilder(config, topic);
90 return unAuthenticatedConsumerBuilder(config, topic);
94 private static ConsumerBuilder unAuthenticatedConsumerBuilder(Configuration config, String topic) {
95 return new CambriaClientBuilders.ConsumerBuilder().usingHosts(config.getDmaapServers()).onTopic(topic)
96 .knownAs(config.getCg(), config.getCid()).withSocketTimeout(config.getPollingTimeout() * 1000);
99 private static ConsumerBuilder authenticatedConsumerBuilder(Configuration config, String topic) {
100 return unAuthenticatedConsumerBuilder(config, topic).usingHttps().authenticatedByHttp(config.getAafUsername(),
101 config.getAafPassword());
105 * Build cambriaClient.
107 public CambriaTopicManager cambriaCLientBuilder(Configuration configuration) {
108 if (configuration.isSecured()) {
109 return authenticatedCambriaCLientBuilder(configuration);
111 return unAuthenticatedCambriaCLientBuilder(configuration);
116 private static CambriaTopicManager authenticatedCambriaCLientBuilder(Configuration config) {
118 return buildCambriaClient(new TopicManagerBuilder().usingHosts(config.getDmaapServers())
119 .authenticatedByHttp(config.getAafUsername(), config.getAafPassword()));
120 } catch (MalformedURLException | GeneralSecurityException e) {
125 private static CambriaTopicManager unAuthenticatedCambriaCLientBuilder(Configuration config) {
127 return buildCambriaClient(new TopicManagerBuilder().usingHosts(config.getDmaapServers()));
128 } catch (MalformedURLException | GeneralSecurityException e) {
134 @SuppressWarnings("unchecked")
135 private static <T extends CambriaClient> T buildCambriaClient(
136 CambriaClientBuilders.AbstractAuthenticatedManagerBuilder<? extends CambriaClient> client)
137 throws MalformedURLException, GeneralSecurityException {
138 return (T) client.build();