Sonar Critical Fix
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-plugins / src / main / java / org / openecomp / dcae / apod / analytics / cdap / plugins / sparkcompute / tca / SimpleTCAPlugin.java
1 /*\r
2  * ===============================LICENSE_START======================================\r
3  *  dcae-analytics\r
4  * ================================================================================\r
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
6  * ================================================================================\r
7  *  Licensed under the Apache License, Version 2.0 (the "License");\r
8  *  you may not use this file except in compliance with the License.\r
9  *   You may obtain a copy of the License at\r
10  *\r
11  *          http://www.apache.org/licenses/LICENSE-2.0\r
12  *\r
13  *  Unless required by applicable law or agreed to in writing, software\r
14  *  distributed under the License is distributed on an "AS IS" BASIS,\r
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  *  See the License for the specific language governing permissions and\r
17  *  limitations under the License.\r
18  *  ============================LICENSE_END===========================================\r
19  */\r
20 \r
21 package org.openecomp.dcae.apod.analytics.cdap.plugins.sparkcompute.tca;\r
22 \r
23 import co.cask.cdap.api.annotation.Description;\r
24 import co.cask.cdap.api.annotation.Name;\r
25 import co.cask.cdap.api.annotation.Plugin;\r
26 import co.cask.cdap.api.data.format.StructuredRecord;\r
27 import co.cask.cdap.api.data.format.StructuredRecord.Builder;\r
28 import co.cask.cdap.api.data.schema.Schema;\r
29 import co.cask.cdap.etl.api.PipelineConfigurer;\r
30 import co.cask.cdap.etl.api.StageMetrics;\r
31 import co.cask.cdap.etl.api.batch.SparkCompute;\r
32 import co.cask.cdap.etl.api.batch.SparkExecutionPluginContext;\r
33 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;\r
34 import org.apache.spark.api.java.JavaRDD;\r
35 import org.apache.spark.api.java.function.Function;\r
36 import org.openecomp.dcae.apod.analytics.cdap.common.CDAPMetricsConstants;\r
37 import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCACalculatorMessageType;\r
38 import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils;\r
39 import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.tca.SimpleTCAPluginConfig;\r
40 import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.CDAPPluginUtils;\r
41 import org.openecomp.dcae.apod.analytics.cdap.plugins.validator.SimpleTCAPluginConfigValidator;\r
42 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;\r
43 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy;\r
44 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold;\r
45 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFJsonProcessor;\r
46 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFProcessorContext;\r
47 import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils;\r
48 import org.slf4j.Logger;\r
49 import org.slf4j.LoggerFactory;\r
50 \r
51 /**\r
52  * @author Rajiv Singla . Creation Date: 2/13/2017.\r
53  */\r
54 \r
55 @Plugin(type = SparkCompute.PLUGIN_TYPE)\r
56 @Name("SimpleTCAPlugin")\r
57 @Description("Used to create TCA (Threshold Crossing Alert) based on given Policy")\r
58 @SuppressFBWarnings("SE_INNER_CLASS")\r
59 public class SimpleTCAPlugin extends SparkCompute<StructuredRecord, StructuredRecord> {\r
60 \r
61     private static final Logger LOG = LoggerFactory.getLogger(SimpleTCAPlugin.class);\r
62     private static final long serialVersionUID = 1L;\r
63 \r
64     private final SimpleTCAPluginConfig pluginConfig;\r
65 \r
66     /**\r
67      * Create an instance of Simple TCA Plugin with give Simple TCA Plugin Config\r
68      *\r
69      * @param pluginConfig Simple TCA Plugin Config\r
70      */\r
71     public SimpleTCAPlugin(SimpleTCAPluginConfig pluginConfig) {\r
72         this.pluginConfig = pluginConfig;\r
73         LOG.info("Creating instance of Simple TCA Plugin with plugin config: {}", pluginConfig);\r
74     }\r
75 \r
76     @Override\r
77     public void configurePipeline(PipelineConfigurer pipelineConfigurer) {\r
78         super.configurePipeline(pipelineConfigurer);\r
79         ValidationUtils.validateSettings(pluginConfig, new SimpleTCAPluginConfigValidator());\r
80         final Schema inputSchema = pipelineConfigurer.getStageConfigurer().getInputSchema();\r
81         CDAPPluginUtils.validateSchemaContainsFields(inputSchema, pluginConfig.getVesMessageFieldName());\r
82         CDAPPluginUtils.setOutputSchema(pipelineConfigurer, pluginConfig.getSchema());\r
83     }\r
84 \r
85     @Override\r
86     public JavaRDD<StructuredRecord> transform(final SparkExecutionPluginContext context,\r
87                                                final JavaRDD<StructuredRecord> input) throws Exception {\r
88         final StageMetrics metrics = context.getMetrics();\r
89 \r
90         LOG.debug("Invoking Spark Transform for Simple TCA Plugin");\r
91         return input.map(new Function<StructuredRecord, StructuredRecord>() {\r
92 \r
93             @Override\r
94             public StructuredRecord call(StructuredRecord inputStructuredRecord) throws Exception {\r
95                 TCACalculatorMessageType calculatorMessageType;\r
96                 String alertMessage = null;\r
97 \r
98                 // Get input structured record\r
99                 final String cefMessage = inputStructuredRecord.get(pluginConfig.getVesMessageFieldName());\r
100 \r
101                 // Get TCA Policy\r
102                 final TCAPolicy tcaPolicy = CDAPPluginUtils.readValue(pluginConfig.getPolicyJson(), TCAPolicy.class);\r
103 \r
104                 // create initial processor context\r
105                 final TCACEFProcessorContext initialProcessorContext =\r
106                         new TCACEFProcessorContext(cefMessage, tcaPolicy);\r
107 \r
108                 final TCACEFJsonProcessor jsonProcessor = new TCACEFJsonProcessor();\r
109                 final TCACEFProcessorContext jsonProcessorContext =\r
110                         jsonProcessor.processMessage(initialProcessorContext);\r
111 \r
112                 if (jsonProcessorContext.getCEFEventListener() != null) {\r
113 \r
114                     LOG.debug("Json to CEF parsing successful. Parsed object {}",\r
115                             jsonProcessorContext.getCEFEventListener());\r
116 \r
117                     // compute violations\r
118                     final TCACEFProcessorContext processorContextWithViolations =\r
119                             TCAUtils.computeThresholdViolations(jsonProcessorContext);\r
120 \r
121                     // if violation are found then create alert message\r
122                     if (processorContextWithViolations.canProcessingContinue()) {\r
123 \r
124                         alertMessage = TCAUtils.createTCAAlertString(processorContextWithViolations,\r
125                                 pluginConfig.getReferenceName(), pluginConfig.getEnableAlertCEFFormat());\r
126                         calculatorMessageType = TCACalculatorMessageType.NON_COMPLIANT;\r
127 \r
128                         LOG.debug("VES Threshold Violation Detected.An alert message is be generated: {}",\r
129                                 alertMessage);\r
130 \r
131                         final MetricsPerEventName metricsPerEventName =\r
132                                 processorContextWithViolations.getMetricsPerEventName();\r
133                         if (metricsPerEventName != null\r
134                                 && metricsPerEventName.getThresholds() != null\r
135                                 && metricsPerEventName.getThresholds().get(0) != null) {\r
136                             final Threshold violatedThreshold = metricsPerEventName.getThresholds().get(0);\r
137                             LOG.debug("CEF Message: {}, Violated Threshold: {}", cefMessage, violatedThreshold);\r
138                         }\r
139 \r
140                         metrics.count(CDAPMetricsConstants.TCA_VES_NON_COMPLIANT_MESSAGES_METRIC, 1);\r
141 \r
142                     } else {\r
143                         LOG.debug("No Threshold Violation Detected. No alert will be generated.");\r
144                         calculatorMessageType = TCACalculatorMessageType.COMPLIANT;\r
145                         metrics.count(CDAPMetricsConstants.TCA_VES_COMPLIANT_MESSAGES_METRIC, 1);\r
146                     }\r
147 \r
148                 } else {\r
149                     LOG.info("Unable to parse provided json message to CEF format. Invalid message: {}", cefMessage);\r
150                     calculatorMessageType = TCACalculatorMessageType.INAPPLICABLE;\r
151                 }\r
152 \r
153                 LOG.debug("Calculator message type: {} for message: {}", calculatorMessageType, cefMessage);\r
154 \r
155                 final Schema outputSchema = Schema.parseJson(pluginConfig.getSchema());\r
156 \r
157                 // create new output record builder and copy any input record values to output record builder\r
158                 final Builder outputRecordBuilder =\r
159                         CDAPPluginUtils.createOutputStructuredRecordBuilder(outputSchema, inputStructuredRecord);\r
160 \r
161                 // add alert field\r
162                 final Builder outputRecordBuilderWithAlertField =\r
163                         CDAPPluginUtils.addFieldValueToStructuredRecordBuilder(outputRecordBuilder,\r
164                                 outputSchema, pluginConfig.getAlertFieldName(), alertMessage);\r
165 \r
166                 // add message field type\r
167                 final Builder outRecordBuilderWithMessageTypeField =\r
168                         CDAPPluginUtils.addFieldValueToStructuredRecordBuilder(outputRecordBuilderWithAlertField,\r
169                                 outputSchema, pluginConfig.getMessageTypeFieldName(), calculatorMessageType.toString());\r
170 \r
171                 return outRecordBuilderWithMessageTypeField.build();\r
172             }\r
173         });\r
174     }\r
175 }\r