2 * ===============================LICENSE_START======================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============================LICENSE_END===========================================
21 package org.openecomp.dcae.apod.analytics.cdap.plugins.utils;
23 import com.google.common.base.Function;
24 import org.apache.hadoop.conf.Configuration;
25 import org.openecomp.dcae.apod.analytics.cdap.common.CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields;
26 import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;
28 import javax.annotation.Nonnull;
30 import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isEmpty;
31 import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isPresent;
34 * Function that converts {@link Configuration} to {@link DMaaPMRPublisherConfig}
36 * @author Rajiv Singla . Creation Date: 1/26/2017.
38 public class DMaaPSinkConfigMapper implements Function<Configuration, DMaaPMRPublisherConfig> {
41 * Static method to map {@link Configuration} to {@link DMaaPMRPublisherConfig}
43 * @param sinkPluginConfig DMaaP Sink Plugin Config
45 * @return DMaaP MR Publisher Config
47 public static DMaaPMRPublisherConfig map(final Configuration sinkPluginConfig) {
48 return new DMaaPSinkConfigMapper().apply(sinkPluginConfig);
52 * Converts {@link Configuration} to {@link DMaaPMRPublisherConfig}
54 * @param configuration Hadoop Configuration containing DMaaP MR Sink field values
56 * @return DMaaP MR Publisher Config
60 public DMaaPMRPublisherConfig apply(@Nonnull Configuration configuration) {
62 // Create a new publisher settings builder
63 final String hostName = configuration.get(DMaaPMRSinkHadoopConfigFields.HOST_NAME);
64 final String topicName = configuration.get(DMaaPMRSinkHadoopConfigFields.TOPIC_NAME);
66 if (isEmpty(hostName) || isEmpty(topicName)) {
67 throw new IllegalStateException("DMaaP MR Sink Host Name and Topic Name must be present");
70 final DMaaPMRPublisherConfig.Builder publisherConfigBuilder =
71 new DMaaPMRPublisherConfig.Builder(hostName, topicName);
73 // Setup up any optional publisher parameters if they are present
74 final String portNumber = configuration.get(DMaaPMRSinkHadoopConfigFields.PORT_NUMBER);
75 if (portNumber != null) {
76 publisherConfigBuilder.setPortNumber(Integer.parseInt(portNumber));
79 final String protocol = configuration.get(DMaaPMRSinkHadoopConfigFields.PROTOCOL);
80 if (isPresent(protocol)) {
81 publisherConfigBuilder.setProtocol(protocol);
84 final String userName = configuration.get(DMaaPMRSinkHadoopConfigFields.USER_NAME);
85 if (isPresent(userName)) {
86 publisherConfigBuilder.setUserName(userName);
89 final String userPassword = configuration.get(DMaaPMRSinkHadoopConfigFields.USER_PASS);
90 if (isPresent(userPassword)) {
91 publisherConfigBuilder.setUserPassword(userPassword);
94 final String contentType = configuration.get(DMaaPMRSinkHadoopConfigFields.CONTENT_TYPE);
95 if (isPresent(contentType)) {
96 publisherConfigBuilder.setContentType(contentType);
99 final String maxBatchSize = configuration.get(DMaaPMRSinkHadoopConfigFields.MAX_BATCH_SIZE);
100 if (maxBatchSize != null) {
101 publisherConfigBuilder.setMaxBatchSize(Integer.parseInt(maxBatchSize));
104 final String maxRecoveryQueueSize = configuration.get(DMaaPMRSinkHadoopConfigFields.MAX_RECOVER_QUEUE_SIZE);
105 if (maxRecoveryQueueSize != null) {
106 publisherConfigBuilder.setMaxRecoveryQueueSize(Integer.parseInt(maxRecoveryQueueSize));
109 return publisherConfigBuilder.build();