b92ecbaf9bcf1f68e05454682559c98d2064a6df
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-plugins / src / main / java / org / openecomp / dcae / apod / analytics / cdap / plugins / batch / sink / dmaap / DMaaPMRSink.java
1 /*\r
2  * ===============================LICENSE_START======================================\r
3  *  dcae-analytics\r
4  * ================================================================================\r
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
6  * ================================================================================\r
7  *  Licensed under the Apache License, Version 2.0 (the "License");\r
8  *  you may not use this file except in compliance with the License.\r
9  *   You may obtain a copy of the License at\r
10  *\r
11  *          http://www.apache.org/licenses/LICENSE-2.0\r
12  *\r
13  *  Unless required by applicable law or agreed to in writing, software\r
14  *  distributed under the License is distributed on an "AS IS" BASIS,\r
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  *  See the License for the specific language governing permissions and\r
17  *  limitations under the License.\r
18  *  ============================LICENSE_END===========================================\r
19  */\r
20 \r
21 package org.openecomp.dcae.apod.analytics.cdap.plugins.batch.sink.dmaap;\r
22 \r
23 import co.cask.cdap.api.annotation.Description;\r
24 import co.cask.cdap.api.annotation.Name;\r
25 import co.cask.cdap.api.annotation.Plugin;\r
26 import co.cask.cdap.api.data.batch.Output;\r
27 import co.cask.cdap.api.data.format.StructuredRecord;\r
28 import co.cask.cdap.api.data.schema.Schema;\r
29 import co.cask.cdap.api.dataset.lib.KeyValue;\r
30 import co.cask.cdap.etl.api.Emitter;\r
31 import co.cask.cdap.etl.api.PipelineConfigurer;\r
32 import co.cask.cdap.etl.api.batch.BatchSink;\r
33 import co.cask.cdap.etl.api.batch.BatchSinkContext;\r
34 import org.apache.hadoop.io.NullWritable;\r
35 import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils;\r
36 import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSinkPluginConfig;\r
37 import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.CDAPPluginUtils;\r
38 import org.openecomp.dcae.apod.analytics.cdap.plugins.validator.DMaaPMRSinkPluginConfigValidator;\r
39 import org.slf4j.Logger;\r
40 import org.slf4j.LoggerFactory;\r
41 \r
42 /**\r
43  * @author Rajiv Singla . Creation Date: 1/26/2017.\r
44  */\r
45 @Plugin(type = BatchSink.PLUGIN_TYPE)\r
46 @Name("DMaaPMRSink")\r
47 @Description("A batch sink Plugin that publishes messages to DMaaP MR Topic.")\r
48 public class DMaaPMRSink extends BatchSink<StructuredRecord, String, NullWritable> {\r
49 \r
50     private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRSink.class);\r
51 \r
52     private final DMaaPMRSinkPluginConfig pluginConfig;\r
53 \r
54     public DMaaPMRSink(final DMaaPMRSinkPluginConfig pluginConfig) {\r
55         LOG.debug("Creating DMaaP MR Sink Plugin with plugin Config: {}", pluginConfig);\r
56         this.pluginConfig = pluginConfig;\r
57     }\r
58 \r
59     @Override\r
60     public void configurePipeline(final PipelineConfigurer pipelineConfigurer) {\r
61         super.configurePipeline(pipelineConfigurer);\r
62         ValidationUtils.validateSettings(pluginConfig, new DMaaPMRSinkPluginConfigValidator());\r
63         // validates that input schema contains the field provided in Sink Message Column Name property\r
64         final Schema inputSchema = pipelineConfigurer.getStageConfigurer().getInputSchema();\r
65         CDAPPluginUtils.validateSchemaContainsFields(inputSchema, pluginConfig.getMessageColumnName());\r
66     }\r
67 \r
68 \r
69     @Override\r
70     public void prepareRun(BatchSinkContext context) throws Exception {\r
71         context.addOutput(Output.of(pluginConfig.getReferenceName(), new DMaaPMROutputFormatProvider(pluginConfig)));\r
72     }\r
73 \r
74     @Override\r
75     public void transform(StructuredRecord structuredRecord,\r
76                           Emitter<KeyValue<String, NullWritable>> emitter) throws Exception {\r
77         // get incoming message from structured record\r
78         final String incomingMessage = structuredRecord.get(pluginConfig.getMessageColumnName());\r
79 \r
80         // if incoming messages does not have message column name log warning as it should not happen\r
81         if (incomingMessage == null) {\r
82             LOG.warn("Column Name: {}, contains no message.Skipped for DMaaP MR Publishing....",\r
83                     pluginConfig.getMessageColumnName());\r
84         } else {\r
85 \r
86             // emit the messages as key\r
87             emitter.emit(new KeyValue<String, NullWritable>(incomingMessage, null));\r
88         }\r
89     }\r
90 }\r