TCA: Support for VES/A&AI enrichment
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-plugins / src / main / java / org / openecomp / dcae / apod / analytics / cdap / plugins / utils / DMaaPSinkConfigMapper.java
1 /*\r
2  * ===============================LICENSE_START======================================\r
3  *  dcae-analytics\r
4  * ================================================================================\r
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
6  * ================================================================================\r
7  *  Licensed under the Apache License, Version 2.0 (the "License");\r
8  *  you may not use this file except in compliance with the License.\r
9  *   You may obtain a copy of the License at\r
10  *\r
11  *          http://www.apache.org/licenses/LICENSE-2.0\r
12  *\r
13  *  Unless required by applicable law or agreed to in writing, software\r
14  *  distributed under the License is distributed on an "AS IS" BASIS,\r
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  *  See the License for the specific language governing permissions and\r
17  *  limitations under the License.\r
18  *  ============================LICENSE_END===========================================\r
19  */\r
20 \r
21 package org.openecomp.dcae.apod.analytics.cdap.plugins.utils;\r
22 \r
23 import com.google.common.base.Function;\r
24 import org.apache.hadoop.conf.Configuration;\r
25 import org.openecomp.dcae.apod.analytics.cdap.common.CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields;\r
26 import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;\r
27 \r
28 import javax.annotation.Nonnull;\r
29 \r
30 import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isEmpty;\r
31 import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isPresent;\r
32 \r
33 /**\r
34  * Function that converts {@link Configuration} to {@link DMaaPMRPublisherConfig}\r
35  * <p>\r
36  * @author Rajiv Singla . Creation Date: 1/26/2017.\r
37  */\r
38 public class DMaaPSinkConfigMapper implements Function<Configuration, DMaaPMRPublisherConfig> {\r
39 \r
40     /**\r
41      * Static method to map {@link Configuration} to {@link DMaaPMRPublisherConfig}\r
42      *\r
43      * @param sinkPluginConfig DMaaP Sink Plugin Config\r
44      *\r
45      * @return DMaaP MR Publisher Config\r
46      */\r
47     public static DMaaPMRPublisherConfig map(final Configuration sinkPluginConfig) {\r
48         return new DMaaPSinkConfigMapper().apply(sinkPluginConfig);\r
49     }\r
50 \r
51     /**\r
52      * Converts {@link Configuration} to {@link DMaaPMRPublisherConfig}\r
53      *\r
54      * @param configuration Hadoop Configuration containing DMaaP MR Sink field values\r
55      *\r
56      * @return DMaaP MR Publisher Config\r
57      */\r
58     @Nonnull\r
59     @Override\r
60     public DMaaPMRPublisherConfig apply(@Nonnull Configuration configuration) {\r
61 \r
62         // Create a new publisher settings builder\r
63         final String hostName = configuration.get(DMaaPMRSinkHadoopConfigFields.HOST_NAME);\r
64         final String topicName = configuration.get(DMaaPMRSinkHadoopConfigFields.TOPIC_NAME);\r
65 \r
66         if (isEmpty(hostName) || isEmpty(topicName)) {\r
67             throw new IllegalStateException("DMaaP MR Sink Host Name and Topic Name must be present");\r
68         }\r
69 \r
70         final DMaaPMRPublisherConfig.Builder publisherConfigBuilder =\r
71                 new DMaaPMRPublisherConfig.Builder(hostName, topicName);\r
72 \r
73         // Setup up any optional publisher parameters if they are present\r
74         final String portNumber = configuration.get(DMaaPMRSinkHadoopConfigFields.PORT_NUMBER);\r
75         if (portNumber != null) {\r
76             publisherConfigBuilder.setPortNumber(Integer.parseInt(portNumber));\r
77         }\r
78 \r
79         final String protocol = configuration.get(DMaaPMRSinkHadoopConfigFields.PROTOCOL);\r
80         if (isPresent(protocol)) {\r
81             publisherConfigBuilder.setProtocol(protocol);\r
82         }\r
83 \r
84         final String userName = configuration.get(DMaaPMRSinkHadoopConfigFields.USER_NAME);\r
85         if (isPresent(userName)) {\r
86             publisherConfigBuilder.setUserName(userName);\r
87         }\r
88 \r
89         final String userPassword = configuration.get(DMaaPMRSinkHadoopConfigFields.USER_PASS);\r
90         if (isPresent(userPassword)) {\r
91             publisherConfigBuilder.setUserPassword(userPassword);\r
92         }\r
93 \r
94         final String contentType = configuration.get(DMaaPMRSinkHadoopConfigFields.CONTENT_TYPE);\r
95         if (isPresent(contentType)) {\r
96             publisherConfigBuilder.setContentType(contentType);\r
97         }\r
98 \r
99         final String maxBatchSize = configuration.get(DMaaPMRSinkHadoopConfigFields.MAX_BATCH_SIZE);\r
100         if (maxBatchSize != null) {\r
101             publisherConfigBuilder.setMaxBatchSize(Integer.parseInt(maxBatchSize));\r
102         }\r
103 \r
104         final String maxRecoveryQueueSize = configuration.get(DMaaPMRSinkHadoopConfigFields.MAX_RECOVER_QUEUE_SIZE);\r
105         if (maxRecoveryQueueSize != null) {\r
106             publisherConfigBuilder.setMaxRecoveryQueueSize(Integer.parseInt(maxRecoveryQueueSize));\r
107         }\r
108 \r
109         return publisherConfigBuilder.build();\r
110 \r
111     }\r
112 }\r