Merge "New KPI Compution MS"
[dcaegen2/services.git] / components / kpi-computation-ms / src / main / java / org / onap / dcaegen2 / kpi / utils / DmaapUtils.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2021 China Mobile.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.dcaegen2.kpi.utils;
22
23 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
24 import com.att.nsa.cambria.client.CambriaClientBuilders;
25 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
26 import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
27 import com.att.nsa.cambria.client.CambriaConsumer;
28
29 import java.net.MalformedURLException;
30 import java.security.GeneralSecurityException;
31
32 import org.onap.dcaegen2.kpi.models.Configuration;
33
34 /**
35  * Utility class to perform actions related to Dmaap.
36  *
37  * @author Kai Lu
38  *
39  */
40 public class DmaapUtils {
41
42     /**
43      * Build publisher.
44      */
45     public CambriaBatchingPublisher buildPublisher(Configuration config, String topic) {
46         try {
47             return builder(config, topic).build();
48         } catch (MalformedURLException | GeneralSecurityException e) {
49             return null;
50
51         }
52     }
53
54     /**
55      * Build consumer.
56      */
57     public CambriaConsumer buildConsumer(Configuration config, String topic) {
58
59         try {
60             return builderConsumer(config, topic).build();
61         } catch (MalformedURLException | GeneralSecurityException e) {
62             return null;
63         }
64
65     }
66
67     private static PublisherBuilder builder(Configuration config, String topic) {
68         if (config.isSecured()) {
69             return authenticatedBuilder(config, topic);
70         } else {
71             return unAuthenticatedBuilder(config, topic);
72         }
73     }
74
75     private static PublisherBuilder authenticatedBuilder(Configuration config, String topic) {
76         return unAuthenticatedBuilder(config, topic).usingHttps().authenticatedByHttp(config.getAafUsername(),
77                 config.getAafPassword());
78     }
79
80     private static PublisherBuilder unAuthenticatedBuilder(Configuration config, String topic) {
81         return new CambriaClientBuilders.PublisherBuilder().usingHosts(config.getDmaapServers()).onTopic(topic)
82                 .logSendFailuresAfter(5);
83     }
84
85     private static ConsumerBuilder builderConsumer(Configuration config, String topic) {
86         if (config.isSecured()) {
87             return authenticatedConsumerBuilder(config, topic);
88         } else {
89             return unAuthenticatedConsumerBuilder(config, topic);
90         }
91     }
92
93     private static ConsumerBuilder unAuthenticatedConsumerBuilder(Configuration config, String topic) {
94         return new CambriaClientBuilders.ConsumerBuilder().usingHosts(config.getDmaapServers()).onTopic(topic)
95                 .knownAs(config.getCg(), config.getCid()).withSocketTimeout(config.getPollingTimeout() * 1000);
96     }
97
98     private static ConsumerBuilder authenticatedConsumerBuilder(Configuration config, String topic) {
99         return unAuthenticatedConsumerBuilder(config, topic).usingHttps().authenticatedByHttp(config.getAafUsername(),
100                 config.getAafPassword());
101     }
102
103 }