Sonar Critical Fix
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-plugins / src / main / java / org / openecomp / dcae / apod / analytics / cdap / plugins / batch / sink / dmaap / DMaaPMROutputFormatProvider.java
1 /*\r
2  * ===============================LICENSE_START======================================\r
3  *  dcae-analytics\r
4  * ================================================================================\r
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
6  * ================================================================================\r
7  *  Licensed under the Apache License, Version 2.0 (the "License");\r
8  *  you may not use this file except in compliance with the License.\r
9  *   You may obtain a copy of the License at\r
10  *\r
11  *          http://www.apache.org/licenses/LICENSE-2.0\r
12  *\r
13  *  Unless required by applicable law or agreed to in writing, software\r
14  *  distributed under the License is distributed on an "AS IS" BASIS,\r
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  *  See the License for the specific language governing permissions and\r
17  *  limitations under the License.\r
18  *  ============================LICENSE_END===========================================\r
19  */\r
20 \r
21 package org.openecomp.dcae.apod.analytics.cdap.plugins.batch.sink.dmaap;\r
22 \r
23 import co.cask.cdap.api.data.batch.OutputFormatProvider;\r
24 import org.openecomp.dcae.apod.analytics.cdap.common.CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields;\r
25 import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils;\r
26 import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSinkPluginConfig;\r
27 import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;\r
28 \r
29 import java.util.LinkedHashMap;\r
30 import java.util.Map;\r
31 \r
32 /**\r
33  * DMaaP MR Output Format Provider used to create Batch Sink Plugin\r
34  * <p>\r
35  * @author Rajiv Singla . Creation Date: 1/27/2017.\r
36  */\r
37 public class DMaaPMROutputFormatProvider implements OutputFormatProvider {\r
38 \r
39     private final Map<String, String> sinkConfig;\r
40 \r
41 \r
42     public DMaaPMROutputFormatProvider(DMaaPMRSinkPluginConfig sinkPluginConfig) {\r
43 \r
44         // initialize Sink Config - with DMaaP MR Publisher config values\r
45         sinkConfig = new LinkedHashMap<>();\r
46 \r
47         // Required fields for sink config\r
48         sinkConfig.put(DMaaPMRSinkHadoopConfigFields.HOST_NAME, sinkPluginConfig.getHostName());\r
49         sinkConfig.put(DMaaPMRSinkHadoopConfigFields.TOPIC_NAME, sinkPluginConfig.getTopicName());\r
50 \r
51         final Integer configPortNumber = sinkPluginConfig.getPortNumber();\r
52         if (configPortNumber != null) {\r
53             sinkConfig.put(DMaaPMRSinkHadoopConfigFields.PORT_NUMBER, configPortNumber.toString());\r
54         } else {\r
55             sinkConfig.put(DMaaPMRSinkHadoopConfigFields.PORT_NUMBER,\r
56                     AnalyticsConstants.DEFAULT_PORT_NUMBER.toString());\r
57         }\r
58 \r
59         final String configProtocol = sinkPluginConfig.getProtocol();\r
60         if (ValidationUtils.isPresent(configProtocol)) {\r
61             sinkConfig.put(DMaaPMRSinkHadoopConfigFields.PROTOCOL, configProtocol);\r
62         } else {\r
63             sinkConfig.put(DMaaPMRSinkHadoopConfigFields.PROTOCOL, AnalyticsConstants.DEFAULT_PROTOCOL);\r
64         }\r
65 \r
66 \r
67         final String configUserName = sinkPluginConfig.getUserName();\r
68         if (ValidationUtils.isPresent(configUserName)) {\r
69             sinkConfig.put(DMaaPMRSinkHadoopConfigFields.USER_NAME, configUserName);\r
70         } else {\r
71             sinkConfig.put(DMaaPMRSinkHadoopConfigFields.USER_NAME, AnalyticsConstants.DEFAULT_USER_NAME);\r
72         }\r
73 \r
74         final String configUserPass = sinkPluginConfig.getUserPassword();\r
75         if (ValidationUtils.isPresent(configUserPass)) {\r
76             sinkConfig.put(DMaaPMRSinkHadoopConfigFields.USER_PASS, configUserPass);\r
77         } else {\r
78             sinkConfig.put(DMaaPMRSinkHadoopConfigFields.USER_PASS, AnalyticsConstants.DEFAULT_USER_PASSWORD);\r
79         }\r
80 \r
81         final String configContentType = sinkPluginConfig.getContentType();\r
82         if (ValidationUtils.isPresent(configContentType)) {\r
83             sinkConfig.put(DMaaPMRSinkHadoopConfigFields.CONTENT_TYPE, configContentType);\r
84         } else {\r
85             sinkConfig.put(DMaaPMRSinkHadoopConfigFields.CONTENT_TYPE, AnalyticsConstants.DEFAULT_CONTENT_TYPE);\r
86         }\r
87 \r
88 \r
89         final Integer configMaxBatchSize = sinkPluginConfig.getMaxBatchSize();\r
90         if (configMaxBatchSize != null) {\r
91             sinkConfig.put(DMaaPMRSinkHadoopConfigFields.MAX_BATCH_SIZE, configMaxBatchSize.toString());\r
92         } else {\r
93             sinkConfig.put(DMaaPMRSinkHadoopConfigFields.MAX_BATCH_SIZE,\r
94                     String.valueOf(AnalyticsConstants.DEFAULT_PUBLISHER_MAX_BATCH_SIZE));\r
95         }\r
96 \r
97         final Integer configMaxRecoveryQueueSize = sinkPluginConfig.getMaxRecoveryQueueSize();\r
98         if (configMaxRecoveryQueueSize != null) {\r
99             sinkConfig.put(DMaaPMRSinkHadoopConfigFields.MAX_RECOVER_QUEUE_SIZE, configMaxRecoveryQueueSize.toString());\r
100         } else {\r
101             sinkConfig.put(DMaaPMRSinkHadoopConfigFields.MAX_RECOVER_QUEUE_SIZE,\r
102                     String.valueOf(AnalyticsConstants.DEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE));\r
103         }\r
104 \r
105     }\r
106 \r
107     @Override\r
108     public String getOutputFormatClassName() {\r
109         return DMaaPMROutputFormat.class.getName();\r
110     }\r
111 \r
112     @Override\r
113     public Map<String, String> getOutputFormatConfiguration() {\r
114         return sinkConfig;\r
115     }\r
116 }\r