Merge "[Datalake] Output logs to stdout."
[dcaegen2/services.git] / components / slice-analysis-ms / src / main / java / org / onap / slice / analysis / ms / utils / DmaapUtils.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  slice-analysis-ms
4  *  ================================================================================
5  *   Copyright (C) 2020 Wipro Limited.
6  *   ==============================================================================
7  *     Licensed under the Apache License, Version 2.0 (the "License");
8  *     you may not use this file except in compliance with the License.
9  *     You may obtain a copy of the License at
10  *
11  *          http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *     Unless required by applicable law or agreed to in writing, software
14  *     distributed under the License is distributed on an "AS IS" BASIS,
15  *     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *     See the License for the specific language governing permissions and
17  *     limitations under the License.
18  *     ============LICENSE_END=========================================================
19  *
20  *******************************************************************************/
21
22 package org.onap.slice.analysis.ms.utils;
23
24 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
25 import com.att.nsa.cambria.client.CambriaClient;
26 import com.att.nsa.cambria.client.CambriaClientBuilders;
27 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
28 import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
29 import com.att.nsa.cambria.client.CambriaClientBuilders.TopicManagerBuilder;
30 import com.att.nsa.cambria.client.CambriaConsumer;
31 import com.att.nsa.cambria.client.CambriaTopicManager;
32
33 import java.net.MalformedURLException;
34 import java.security.GeneralSecurityException;
35
36 import org.onap.slice.analysis.ms.beans.Configuration;
37
38 /**
39  * Utility class to perform actions related to Dmaap
40  */
41 public class DmaapUtils {
42
43     /**
44      * Build publisher.
45      */
46     public CambriaBatchingPublisher buildPublisher(Configuration config, String topic) {
47         try {
48             return builder(config, topic).build();
49         } catch (MalformedURLException | GeneralSecurityException e) {
50             return null;
51
52         }
53     }
54
55     /**
56      * Build consumer.
57      */
58     public CambriaConsumer buildConsumer(Configuration config, String topic) {
59
60         try {
61             return builderConsumer(config, topic).build();
62         } catch (MalformedURLException | GeneralSecurityException e) {
63             return null;
64         }
65
66     }
67
68     private static PublisherBuilder builder(Configuration config, String topic) {
69         if (config.isSecured()) {
70             return authenticatedBuilder(config, topic);
71         } else {
72             return unAuthenticatedBuilder(config, topic);
73         }
74     }
75
76     private static PublisherBuilder authenticatedBuilder(Configuration config, String topic) {
77         return unAuthenticatedBuilder(config, topic).usingHttps().authenticatedByHttp(config.getAafUsername(),
78                 config.getAafPassword());
79     }
80
81     private static PublisherBuilder unAuthenticatedBuilder(Configuration config, String topic) {
82         return new CambriaClientBuilders.PublisherBuilder().usingHosts(config.getDmaapServers()).onTopic(topic)
83                 .logSendFailuresAfter(5);
84     }
85
86     private static ConsumerBuilder builderConsumer(Configuration config, String topic) {
87         if (config.isSecured()) {
88             return authenticatedConsumerBuilder(config, topic);
89         } else {
90             return unAuthenticatedConsumerBuilder(config, topic);
91         }
92     }
93
94     private static ConsumerBuilder unAuthenticatedConsumerBuilder(Configuration config, String topic) {
95         return new CambriaClientBuilders.ConsumerBuilder().usingHosts(config.getDmaapServers()).onTopic(topic)
96                 .knownAs(config.getCg(), config.getCid()).withSocketTimeout(config.getPollingTimeout() * 1000);
97     }
98
99     private static ConsumerBuilder authenticatedConsumerBuilder(Configuration config, String topic) {
100         return unAuthenticatedConsumerBuilder(config, topic).usingHttps().authenticatedByHttp(config.getAafUsername(),
101                 config.getAafPassword());
102     }
103
104     /**
105      * Build cambriaClient.
106      */
107     public CambriaTopicManager cambriaCLientBuilder(Configuration configuration) {
108         if (configuration.isSecured()) {
109             return authenticatedCambriaCLientBuilder(configuration);
110         } else {
111             return unAuthenticatedCambriaCLientBuilder(configuration);
112
113         }
114     }
115
116     private static CambriaTopicManager authenticatedCambriaCLientBuilder(Configuration config) {
117         try {
118             return buildCambriaClient(new TopicManagerBuilder().usingHosts(config.getDmaapServers())
119                     .authenticatedByHttp(config.getAafUsername(), config.getAafPassword()));
120         } catch (MalformedURLException | GeneralSecurityException e) {
121             return null;
122         }
123     }
124
125     private static CambriaTopicManager unAuthenticatedCambriaCLientBuilder(Configuration config) {
126         try {
127             return buildCambriaClient(new TopicManagerBuilder().usingHosts(config.getDmaapServers()));
128         } catch (MalformedURLException | GeneralSecurityException e) {
129             return null;
130
131         }
132     }
133
134     @SuppressWarnings("unchecked")
135     private static <T extends CambriaClient> T buildCambriaClient(
136             CambriaClientBuilders.AbstractAuthenticatedManagerBuilder<? extends CambriaClient> client)
137             throws MalformedURLException, GeneralSecurityException {
138         return (T) client.build();
139     }
140
141 }