ebe7d491a8dbf96d7967f83b1c3d602870ba79ab
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-plugins / src / main / java / org / openecomp / dcae / apod / analytics / cdap / plugins / utils / DMaaPSinkConfigMapper.java
1 /*
2  * ===============================LICENSE_START======================================
3  *  dcae-analytics
4  * ================================================================================
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *   You may obtain a copy of the License at
10  *
11  *          http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  *  ============================LICENSE_END===========================================
19  */
20
21 package org.openecomp.dcae.apod.analytics.cdap.plugins.utils;
22
23 import com.google.common.base.Function;
24 import org.apache.hadoop.conf.Configuration;
25 import org.openecomp.dcae.apod.analytics.cdap.common.CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields;
26 import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;
27
28 import javax.annotation.Nonnull;
29
30 import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isEmpty;
31 import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isPresent;
32
33 /**
34  * Function that converts {@link Configuration} to {@link DMaaPMRPublisherConfig}
35  * <p>
36  * @author Rajiv Singla . Creation Date: 1/26/2017.
37  */
38 public class DMaaPSinkConfigMapper implements Function<Configuration, DMaaPMRPublisherConfig> {
39
40     /**
41      * Static method to map {@link Configuration} to {@link DMaaPMRPublisherConfig}
42      *
43      * @param sinkPluginConfig DMaaP Sink Plugin Config
44      *
45      * @return DMaaP MR Publisher Config
46      */
47     public static DMaaPMRPublisherConfig map(final Configuration sinkPluginConfig) {
48         return new DMaaPSinkConfigMapper().apply(sinkPluginConfig);
49     }
50
51     /**
52      * Converts {@link Configuration} to {@link DMaaPMRPublisherConfig}
53      *
54      * @param configuration Hadoop Configuration containing DMaaP MR Sink field values
55      *
56      * @return DMaaP MR Publisher Config
57      */
58     @Nonnull
59     @Override
60     public DMaaPMRPublisherConfig apply(@Nonnull Configuration configuration) {
61
62         // Create a new publisher settings builder
63         final String hostName = configuration.get(DMaaPMRSinkHadoopConfigFields.HOST_NAME);
64         final String topicName = configuration.get(DMaaPMRSinkHadoopConfigFields.TOPIC_NAME);
65
66         if (isEmpty(hostName) || isEmpty(topicName)) {
67             throw new IllegalStateException("DMaaP MR Sink Host Name and Topic Name must be present");
68         }
69
70         final DMaaPMRPublisherConfig.Builder publisherConfigBuilder =
71                 new DMaaPMRPublisherConfig.Builder(hostName, topicName);
72
73         // Setup up any optional publisher parameters if they are present
74         final String portNumber = configuration.get(DMaaPMRSinkHadoopConfigFields.PORT_NUMBER);
75         if (portNumber != null) {
76             publisherConfigBuilder.setPortNumber(Integer.parseInt(portNumber));
77         }
78
79         final String protocol = configuration.get(DMaaPMRSinkHadoopConfigFields.PROTOCOL);
80         if (isPresent(protocol)) {
81             publisherConfigBuilder.setProtocol(protocol);
82         }
83
84         final String userName = configuration.get(DMaaPMRSinkHadoopConfigFields.USER_NAME);
85         if (isPresent(userName)) {
86             publisherConfigBuilder.setUserName(userName);
87         }
88
89         final String userPassword = configuration.get(DMaaPMRSinkHadoopConfigFields.USER_PASS);
90         if (isPresent(userPassword)) {
91             publisherConfigBuilder.setUserPassword(userPassword);
92         }
93
94         final String contentType = configuration.get(DMaaPMRSinkHadoopConfigFields.CONTENT_TYPE);
95         if (isPresent(contentType)) {
96             publisherConfigBuilder.setContentType(contentType);
97         }
98
99         final String maxBatchSize = configuration.get(DMaaPMRSinkHadoopConfigFields.MAX_BATCH_SIZE);
100         if (maxBatchSize != null) {
101             publisherConfigBuilder.setMaxBatchSize(Integer.parseInt(maxBatchSize));
102         }
103
104         final String maxRecoveryQueueSize = configuration.get(DMaaPMRSinkHadoopConfigFields.MAX_RECOVER_QUEUE_SIZE);
105         if (maxRecoveryQueueSize != null) {
106             publisherConfigBuilder.setMaxRecoveryQueueSize(Integer.parseInt(maxRecoveryQueueSize));
107         }
108
109         return publisherConfigBuilder.build();
110
111     }
112 }