2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2021 China Mobile.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.dcaegen2.kpi.utils;
23 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
24 import com.att.nsa.cambria.client.CambriaClientBuilders;
25 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
26 import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
27 import com.att.nsa.cambria.client.CambriaConsumer;
29 import java.net.MalformedURLException;
30 import java.security.GeneralSecurityException;
32 import org.onap.dcaegen2.kpi.models.Configuration;
35 * Utility class to perform actions related to Dmaap.
40 public class DmaapUtils {
45 public CambriaBatchingPublisher buildPublisher(Configuration config, String topic) {
47 return builder(config, topic).build();
48 } catch (MalformedURLException | GeneralSecurityException e) {
57 public CambriaConsumer buildConsumer(Configuration config, String topic) {
60 return builderConsumer(config, topic).build();
61 } catch (MalformedURLException | GeneralSecurityException e) {
67 private static PublisherBuilder builder(Configuration config, String topic) {
68 if (config.isSecured()) {
69 return authenticatedBuilder(config, topic);
71 return unAuthenticatedBuilder(config, topic);
75 private static PublisherBuilder authenticatedBuilder(Configuration config, String topic) {
76 return unAuthenticatedBuilder(config, topic).usingHttps().authenticatedByHttp(config.getAafUsername(),
77 config.getAafPassword());
80 private static PublisherBuilder unAuthenticatedBuilder(Configuration config, String topic) {
81 return new CambriaClientBuilders.PublisherBuilder().usingHosts(config.getDmaapServers()).onTopic(topic)
82 .logSendFailuresAfter(5);
85 private static ConsumerBuilder builderConsumer(Configuration config, String topic) {
86 if (config.isSecured()) {
87 return authenticatedConsumerBuilder(config, topic);
89 return unAuthenticatedConsumerBuilder(config, topic);
93 private static ConsumerBuilder unAuthenticatedConsumerBuilder(Configuration config, String topic) {
94 return new CambriaClientBuilders.ConsumerBuilder().usingHosts(config.getDmaapServers()).onTopic(topic)
95 .knownAs(config.getCg(), config.getCid()).withSocketTimeout(config.getPollingTimeout() * 1000);
98 private static ConsumerBuilder authenticatedConsumerBuilder(Configuration config, String topic) {
99 return unAuthenticatedConsumerBuilder(config, topic).usingHttps().authenticatedByHttp(config.getAafUsername(),
100 config.getAafPassword());