Sonar Critical Fix
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-plugins / src / main / java / org / openecomp / dcae / apod / analytics / cdap / plugins / transform / filter / JsonPathFilter.java
1 /*\r
2  * ===============================LICENSE_START======================================\r
3  *  dcae-analytics\r
4  * ================================================================================\r
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
6  * ================================================================================\r
7  *  Licensed under the Apache License, Version 2.0 (the "License");\r
8  *  you may not use this file except in compliance with the License.\r
9  *   You may obtain a copy of the License at\r
10  *\r
11  *          http://www.apache.org/licenses/LICENSE-2.0\r
12  *\r
13  *  Unless required by applicable law or agreed to in writing, software\r
14  *  distributed under the License is distributed on an "AS IS" BASIS,\r
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  *  See the License for the specific language governing permissions and\r
17  *  limitations under the License.\r
18  *  ============================LICENSE_END===========================================\r
19  */\r
20 \r
21 package org.openecomp.dcae.apod.analytics.cdap.plugins.transform.filter;\r
22 \r
23 import co.cask.cdap.api.annotation.Description;\r
24 import co.cask.cdap.api.annotation.Name;\r
25 import co.cask.cdap.api.annotation.Plugin;\r
26 import co.cask.cdap.api.data.format.StructuredRecord;\r
27 import co.cask.cdap.api.data.schema.Schema;\r
28 import co.cask.cdap.etl.api.Emitter;\r
29 import co.cask.cdap.etl.api.PipelineConfigurer;\r
30 import co.cask.cdap.etl.api.Transform;\r
31 import co.cask.cdap.etl.api.TransformContext;\r
32 import com.google.common.base.Splitter;\r
33 import com.google.common.collect.Maps;\r
34 import com.google.common.collect.Sets;\r
35 import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils;\r
36 import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.filter.JsonPathFilterPluginConfig;\r
37 import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.CDAPPluginUtils;\r
38 import org.openecomp.dcae.apod.analytics.cdap.plugins.validator.JsonPathFilterPluginConfigValidator;\r
39 import org.openecomp.dcae.apod.analytics.common.service.filter.JsonMessageFilterProcessorContext;\r
40 import org.openecomp.dcae.apod.analytics.common.utils.MessageProcessorUtils;\r
41 import org.slf4j.Logger;\r
42 import org.slf4j.LoggerFactory;\r
43 \r
44 import java.util.Map;\r
45 import java.util.Set;\r
46 \r
47 /**\r
48  * Json Path filter Plugin filters incoming schema field based of given json path expected values\r
49  * <p>\r
50  * @author Rajiv Singla . Creation Date: 3/2/2017.\r
51  */\r
52 \r
53 @Plugin(type = Transform.PLUGIN_TYPE)\r
54 @Name("JsonPathFilter")\r
55 @Description("Filters incoming schema field based of given json path expected values")\r
56 public class JsonPathFilter extends Transform<StructuredRecord, StructuredRecord> {\r
57 \r
58     private static final Logger LOG = LoggerFactory.getLogger(JsonPathFilter.class);\r
59 \r
60     private final JsonPathFilterPluginConfig pluginConfig;\r
61     private final Map<String, Set<String>> jsonFilterPathMappings;\r
62 \r
63     public JsonPathFilter(final JsonPathFilterPluginConfig pluginConfig) {\r
64         this.pluginConfig = pluginConfig;\r
65         jsonFilterPathMappings = Maps.newHashMap();\r
66         LOG.info("Created instance of Json Path Filter Plugin with plugin config: {}", pluginConfig);\r
67     }\r
68 \r
69 \r
70     @Override\r
71     public void initialize(final TransformContext context) throws Exception {\r
72         super.initialize(context);\r
73         populateJsonFilterMapping();\r
74     }\r
75 \r
76     @Override\r
77     public void configurePipeline(final PipelineConfigurer pipelineConfigurer) {\r
78         super.configurePipeline(pipelineConfigurer);\r
79         ValidationUtils.validateSettings(pluginConfig, new JsonPathFilterPluginConfigValidator());\r
80         final Schema inputSchema = pipelineConfigurer.getStageConfigurer().getInputSchema();\r
81         CDAPPluginUtils.validateSchemaContainsFields(inputSchema, pluginConfig.getIncomingJsonFieldName());\r
82         populateJsonFilterMapping();\r
83         CDAPPluginUtils.setOutputSchema(pipelineConfigurer, pluginConfig.getSchema());\r
84     }\r
85 \r
86     @Override\r
87     public void transform(final StructuredRecord inputStructuredRecord, final Emitter<StructuredRecord> emitter)\r
88             throws Exception {\r
89 \r
90         // get input json message\r
91         final String jsonMessage = inputStructuredRecord.get(pluginConfig.getIncomingJsonFieldName());\r
92 \r
93         // process Json Filter Mappings\r
94         final JsonMessageFilterProcessorContext jsonMessageFilterProcessorContext =\r
95                 MessageProcessorUtils.processJsonFilterMappings(jsonMessage, jsonFilterPathMappings);\r
96 \r
97         // create new output record builder and copy any input Structured record values to output record builder\r
98         final Schema outputSchema = Schema.parseJson(pluginConfig.getSchema());\r
99         final StructuredRecord.Builder outputRecordBuilder =\r
100                 CDAPPluginUtils.createOutputStructuredRecordBuilder(outputSchema, inputStructuredRecord);\r
101 \r
102         // add json filter matched field\r
103         final StructuredRecord.Builder outputRecordBuilderWithMatchedField =\r
104                 CDAPPluginUtils.addFieldValueToStructuredRecordBuilder(outputRecordBuilder,\r
105                         outputSchema, pluginConfig.getOutputSchemaFieldName(),\r
106                         jsonMessageFilterProcessorContext.getMatched());\r
107 \r
108         // emit structured record with filtering matched field\r
109         final StructuredRecord outputStructuredRecord = outputRecordBuilderWithMatchedField.build();\r
110 \r
111         LOG.debug("Incoming Json Message: {}.Json Path Filter Output Matched Field: {}", jsonMessage,\r
112                 outputStructuredRecord.get(pluginConfig.getOutputSchemaFieldName()));\r
113 \r
114         emitter.emit(outputStructuredRecord);\r
115 \r
116     }\r
117 \r
118     /**\r
119      * Populates Json Filter Mapping\r
120      */\r
121     private void populateJsonFilterMapping() {\r
122         final Map<String, String> fieldMappings =\r
123                 CDAPPluginUtils.extractFieldMappings(pluginConfig.getJsonFilterMappings());\r
124         if (fieldMappings.isEmpty()) {\r
125             throw new IllegalArgumentException("No Field Mapping found. Invalid Filter mapping configuration");\r
126         }\r
127         final Splitter semiColonSplitter = Splitter.on(";");\r
128         for (Map.Entry<String, String> fieldMappingEntry : fieldMappings.entrySet()) {\r
129             jsonFilterPathMappings.put(fieldMappingEntry.getKey(),\r
130                     Sets.newLinkedHashSet(semiColonSplitter.split(fieldMappingEntry.getValue())));\r
131         }\r
132         LOG.info("Input Json Filter Mappings: {}", jsonFilterPathMappings);\r
133     }\r
134 }\r