Initial TCA commit into DCAEGEN2
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-plugins / src / main / java / org / openecomp / dcae / apod / analytics / cdap / plugins / transform / filter / JsonPathFilter.java
1 /*
2  * ===============================LICENSE_START======================================
3  *  dcae-analytics
4  * ================================================================================
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *   You may obtain a copy of the License at
10  *
11  *          http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  *  ============================LICENSE_END===========================================
19  */
20
21 package org.openecomp.dcae.apod.analytics.cdap.plugins.transform.filter;
22
23 import co.cask.cdap.api.annotation.Description;
24 import co.cask.cdap.api.annotation.Name;
25 import co.cask.cdap.api.annotation.Plugin;
26 import co.cask.cdap.api.data.format.StructuredRecord;
27 import co.cask.cdap.api.data.schema.Schema;
28 import co.cask.cdap.etl.api.Emitter;
29 import co.cask.cdap.etl.api.PipelineConfigurer;
30 import co.cask.cdap.etl.api.Transform;
31 import co.cask.cdap.etl.api.TransformContext;
32 import com.google.common.base.Splitter;
33 import com.google.common.collect.Maps;
34 import com.google.common.collect.Sets;
35 import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils;
36 import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.filter.JsonPathFilterPluginConfig;
37 import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.CDAPPluginUtils;
38 import org.openecomp.dcae.apod.analytics.cdap.plugins.validator.JsonPathFilterPluginConfigValidator;
39 import org.openecomp.dcae.apod.analytics.common.service.filter.JsonMessageFilterProcessorContext;
40 import org.openecomp.dcae.apod.analytics.common.utils.MessageProcessorUtils;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 import java.util.Map;
45 import java.util.Set;
46
47 /**
48  * Json Path filter Plugin filters incoming schema field based of given json path expected values
49  * <p>
50  * @author Rajiv Singla . Creation Date: 3/2/2017.
51  */
52
53 @Plugin(type = Transform.PLUGIN_TYPE)
54 @Name("JsonPathFilter")
55 @Description("Filters incoming schema field based of given json path expected values")
56 public class JsonPathFilter extends Transform<StructuredRecord, StructuredRecord> {
57
58     private static final Logger LOG = LoggerFactory.getLogger(JsonPathFilter.class);
59
60     private final JsonPathFilterPluginConfig pluginConfig;
61     private final Map<String, Set<String>> jsonFilterPathMappings;
62
63     public JsonPathFilter(final JsonPathFilterPluginConfig pluginConfig) {
64         this.pluginConfig = pluginConfig;
65         jsonFilterPathMappings = Maps.newHashMap();
66         LOG.info("Created instance of Json Path Filter Plugin with plugin config: {}", pluginConfig);
67     }
68
69
70     @Override
71     public void initialize(final TransformContext context) throws Exception {
72         super.initialize(context);
73         populateJsonFilterMapping();
74     }
75
76     @Override
77     public void configurePipeline(final PipelineConfigurer pipelineConfigurer) {
78         super.configurePipeline(pipelineConfigurer);
79         ValidationUtils.validateSettings(pluginConfig, new JsonPathFilterPluginConfigValidator());
80         final Schema inputSchema = pipelineConfigurer.getStageConfigurer().getInputSchema();
81         CDAPPluginUtils.validateSchemaContainsFields(inputSchema, pluginConfig.getIncomingJsonFieldName());
82         populateJsonFilterMapping();
83         CDAPPluginUtils.setOutputSchema(pipelineConfigurer, pluginConfig.getSchema());
84     }
85
86     @Override
87     public void transform(final StructuredRecord inputStructuredRecord, final Emitter<StructuredRecord> emitter)
88             throws Exception {
89
90         // get input json message
91         final String jsonMessage = inputStructuredRecord.get(pluginConfig.getIncomingJsonFieldName());
92
93         // process Json Filter Mappings
94         final JsonMessageFilterProcessorContext jsonMessageFilterProcessorContext =
95                 MessageProcessorUtils.processJsonFilterMappings(jsonMessage, jsonFilterPathMappings);
96
97         // create new output record builder and copy any input Structured record values to output record builder
98         final Schema outputSchema = Schema.parseJson(pluginConfig.getSchema());
99         final StructuredRecord.Builder outputRecordBuilder =
100                 CDAPPluginUtils.createOutputStructuredRecordBuilder(outputSchema, inputStructuredRecord);
101
102         // add json filter matched field
103         final StructuredRecord.Builder outputRecordBuilderWithMatchedField =
104                 CDAPPluginUtils.addFieldValueToStructuredRecordBuilder(outputRecordBuilder,
105                         outputSchema, pluginConfig.getOutputSchemaFieldName(),
106                         jsonMessageFilterProcessorContext.getMatched());
107
108         // emit structured record with filtering matched field
109         final StructuredRecord outputStructuredRecord = outputRecordBuilderWithMatchedField.build();
110
111         LOG.debug("Incoming Json Message: {}.Json Path Filter Output Matched Field: {}", jsonMessage,
112                 outputStructuredRecord.get(pluginConfig.getOutputSchemaFieldName()));
113
114         emitter.emit(outputStructuredRecord);
115
116     }
117
118     /**
119      * Populates Json Filter Mapping
120      */
121     private void populateJsonFilterMapping() {
122         final Map<String, String> fieldMappings =
123                 CDAPPluginUtils.extractFieldMappings(pluginConfig.getJsonFilterMappings());
124         if (fieldMappings.isEmpty()) {
125             throw new IllegalArgumentException("No Field Mapping found. Invalid Filter mapping configuration");
126         }
127         final Splitter semiColonSplitter = Splitter.on(";");
128         for (Map.Entry<String, String> fieldMappingEntry : fieldMappings.entrySet()) {
129             jsonFilterPathMappings.put(fieldMappingEntry.getKey(),
130                     Sets.newLinkedHashSet(semiColonSplitter.split(fieldMappingEntry.getValue())));
131         }
132         LOG.info("Input Json Filter Mappings: {}", jsonFilterPathMappings);
133     }
134 }