Initial TCA commit into DCAEGEN2
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-plugins / src / main / java / org / openecomp / dcae / apod / analytics / cdap / plugins / batch / sink / dmaap / DMaaPMROutputFormatProvider.java
1 /*
2  * ===============================LICENSE_START======================================
3  *  dcae-analytics
4  * ================================================================================
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *   You may obtain a copy of the License at
10  *
11  *          http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  *  ============================LICENSE_END===========================================
19  */
20
21 package org.openecomp.dcae.apod.analytics.cdap.plugins.batch.sink.dmaap;
22
23 import co.cask.cdap.api.data.batch.OutputFormatProvider;
24 import org.openecomp.dcae.apod.analytics.cdap.common.CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields;
25 import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils;
26 import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSinkPluginConfig;
27 import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
28
29 import java.util.LinkedHashMap;
30 import java.util.Map;
31
32 /**
33  * DMaaP MR Output Format Provider used to create Batch Sink Plugin
34  * <p>
35  * @author Rajiv Singla . Creation Date: 1/27/2017.
36  */
37 public class DMaaPMROutputFormatProvider implements OutputFormatProvider {
38
39     private final Map<String, String> sinkConfig;
40
41
42     public DMaaPMROutputFormatProvider(DMaaPMRSinkPluginConfig sinkPluginConfig) {
43
44         // initialize Sink Config - with DMaaP MR Publisher config values
45         sinkConfig = new LinkedHashMap<>();
46
47         // Required fields for sink config
48         sinkConfig.put(DMaaPMRSinkHadoopConfigFields.HOST_NAME, sinkPluginConfig.getHostName());
49         sinkConfig.put(DMaaPMRSinkHadoopConfigFields.TOPIC_NAME, sinkPluginConfig.getTopicName());
50
51         final Integer configPortNumber = sinkPluginConfig.getPortNumber();
52         if (configPortNumber != null) {
53             sinkConfig.put(DMaaPMRSinkHadoopConfigFields.PORT_NUMBER, configPortNumber.toString());
54         } else {
55             sinkConfig.put(DMaaPMRSinkHadoopConfigFields.PORT_NUMBER,
56                     AnalyticsConstants.DEFAULT_PORT_NUMBER.toString());
57         }
58
59         final String configProtocol = sinkPluginConfig.getProtocol();
60         if (ValidationUtils.isPresent(configProtocol)) {
61             sinkConfig.put(DMaaPMRSinkHadoopConfigFields.PROTOCOL, configProtocol);
62         } else {
63             sinkConfig.put(DMaaPMRSinkHadoopConfigFields.PROTOCOL, AnalyticsConstants.DEFAULT_PROTOCOL);
64         }
65
66
67         final String configUserName = sinkPluginConfig.getUserName();
68         if (ValidationUtils.isPresent(configUserName)) {
69             sinkConfig.put(DMaaPMRSinkHadoopConfigFields.USER_NAME, configUserName);
70         } else {
71             sinkConfig.put(DMaaPMRSinkHadoopConfigFields.USER_NAME, AnalyticsConstants.DEFAULT_USER_NAME);
72         }
73
74         final String configUserPass = sinkPluginConfig.getUserPassword();
75         if (ValidationUtils.isPresent(configUserPass)) {
76             sinkConfig.put(DMaaPMRSinkHadoopConfigFields.USER_PASS, configUserPass);
77         } else {
78             sinkConfig.put(DMaaPMRSinkHadoopConfigFields.USER_PASS, AnalyticsConstants.DEFAULT_USER_PASSWORD);
79         }
80
81         final String configContentType = sinkPluginConfig.getContentType();
82         if (ValidationUtils.isPresent(configContentType)) {
83             sinkConfig.put(DMaaPMRSinkHadoopConfigFields.CONTENT_TYPE, configContentType);
84         } else {
85             sinkConfig.put(DMaaPMRSinkHadoopConfigFields.CONTENT_TYPE, AnalyticsConstants.DEFAULT_CONTENT_TYPE);
86         }
87
88
89         final Integer configMaxBatchSize = sinkPluginConfig.getMaxBatchSize();
90         if (configMaxBatchSize != null) {
91             sinkConfig.put(DMaaPMRSinkHadoopConfigFields.MAX_BATCH_SIZE, configMaxBatchSize.toString());
92         } else {
93             sinkConfig.put(DMaaPMRSinkHadoopConfigFields.MAX_BATCH_SIZE,
94                     String.valueOf(AnalyticsConstants.DEFAULT_PUBLISHER_MAX_BATCH_SIZE));
95         }
96
97         final Integer configMaxRecoveryQueueSize = sinkPluginConfig.getMaxRecoveryQueueSize();
98         if (configMaxRecoveryQueueSize != null) {
99             sinkConfig.put(DMaaPMRSinkHadoopConfigFields.MAX_RECOVER_QUEUE_SIZE, configMaxRecoveryQueueSize.toString());
100         } else {
101             sinkConfig.put(DMaaPMRSinkHadoopConfigFields.MAX_RECOVER_QUEUE_SIZE,
102                     String.valueOf(AnalyticsConstants.DEFAULT_PUBLISHER_MAX_RECOVERY_QUEUE_SIZE));
103         }
104
105     }
106
107     @Override
108     public String getOutputFormatClassName() {
109         return DMaaPMROutputFormat.class.getName();
110     }
111
112     @Override
113     public Map<String, String> getOutputFormatConfiguration() {
114         return sinkConfig;
115     }
116 }