Sonar Critical Fix
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-common / src / main / java / org / openecomp / dcae / apod / analytics / cdap / common / persistance / tca / TCAMessageStatusPersister.java
1 /*\r
2  * ===============================LICENSE_START======================================\r
3  *  dcae-analytics\r
4  * ================================================================================\r
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
6  * ================================================================================\r
7  *  Licensed under the Apache License, Version 2.0 (the "License");\r
8  *  you may not use this file except in compliance with the License.\r
9  *   You may obtain a copy of the License at\r
10  *\r
11  *          http://www.apache.org/licenses/LICENSE-2.0\r
12  *\r
13  *  Unless required by applicable law or agreed to in writing, software\r
14  *  distributed under the License is distributed on an "AS IS" BASIS,\r
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  *  See the License for the specific language governing permissions and\r
17  *  limitations under the License.\r
18  *  ============================LICENSE_END===========================================\r
19  */\r
20 \r
21 package org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca;\r
22 \r
23 import co.cask.cdap.api.data.schema.Schema;\r
24 import co.cask.cdap.api.data.schema.UnsupportedTypeException;\r
25 import co.cask.cdap.api.dataset.DatasetProperties;\r
26 import co.cask.cdap.api.dataset.lib.IndexedTable;\r
27 import co.cask.cdap.api.dataset.lib.ObjectMappedTable;\r
28 import co.cask.cdap.api.dataset.lib.ObjectMappedTableProperties;\r
29 import com.google.common.base.Joiner;\r
30 import org.apache.commons.lang3.StringEscapeUtils;\r
31 import org.apache.commons.lang3.tuple.Pair;\r
32 import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;\r
33 import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;\r
34 import org.openecomp.dcae.apod.analytics.common.service.processor.MessageProcessor;\r
35 import org.openecomp.dcae.apod.analytics.common.service.processor.ProcessorContext;\r
36 import org.openecomp.dcae.apod.analytics.common.utils.PersistenceUtils;\r
37 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;\r
38 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold;\r
39 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFJsonProcessor;\r
40 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyDomainFilter;\r
41 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyEventNameFilter;\r
42 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyThresholdsProcessor;\r
43 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFProcessorContext;\r
44 import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils;\r
45 import org.slf4j.Logger;\r
46 import org.slf4j.LoggerFactory;\r
47 \r
48 import java.util.Date;\r
49 import java.util.LinkedList;\r
50 import java.util.List;\r
51 \r
52 import javax.annotation.Nullable;\r
53 \r
54 import static org.openecomp.dcae.apod.analytics.common.utils.PersistenceUtils.TABLE_ROW_KEY_COLUMN_NAME;\r
55 \r
56 /**\r
57  *\r
58  *\r
59  * @author Rajiv Singla . Creation Date: 11/15/2016.\r
60  */\r
61 public abstract class TCAMessageStatusPersister {\r
62 \r
63     private static final Logger LOG = LoggerFactory.getLogger(TCAMessageStatusPersister.class);\r
64 \r
65     private TCAMessageStatusPersister() {\r
66 \r
67     }\r
68 \r
69     /**\r
70      * Saves Message Status in Table. Assumes no alert was generated\r
71      *\r
72      * @param processorContext processor Context\r
73      * @param instanceId Instance Id\r
74      * @param calculatorMessageType Calculation Message Type\r
75      * @param messageStatusTable Message Status Table\r
76      */\r
77     public static void persist(final TCACEFProcessorContext processorContext,\r
78                                final int instanceId,\r
79                                final TCACalculatorMessageType calculatorMessageType,\r
80                                final ObjectMappedTable<TCAMessageStatusEntity> messageStatusTable) {\r
81         persist(processorContext, instanceId, calculatorMessageType, messageStatusTable, null);\r
82     }\r
83 \r
84     /**\r
85      * Saves Message Status in Table. Sets up alert message aslo\r
86      *\r
87      * @param processorContext processor Context\r
88      * @param instanceId Instance Id\r
89      * @param calculatorMessageType Calculation Message Type\r
90      * @param messageStatusTable Message Status Table\r
91      * @param alertMessage Alert message\r
92      */\r
93     public static void persist(final TCACEFProcessorContext processorContext,\r
94                                final int instanceId,\r
95                                final TCACalculatorMessageType calculatorMessageType,\r
96                                final ObjectMappedTable<TCAMessageStatusEntity> messageStatusTable,\r
97                                @Nullable final String alertMessage) {\r
98 \r
99         final String rowKey = createKey(calculatorMessageType);\r
100 \r
101         final Long currentTS = new Date().getTime();\r
102         final String vesMessage = StringEscapeUtils.unescapeJson(processorContext.getMessage());\r
103 \r
104         // Find Functional Role and domain\r
105         final Pair<String, String> domainAndEventName = TCAUtils.getDomainAndEventName(processorContext);\r
106         final String domain = domainAndEventName.getLeft();\r
107         final String eventName = domainAndEventName.getRight();\r
108 \r
109         final TCAMessageStatusEntity tcaMessageStatusEntity = new TCAMessageStatusEntity(currentTS,\r
110                 instanceId, calculatorMessageType.name(), vesMessage, domain, eventName);\r
111 \r
112         // add threshold violation fields\r
113         addViolatedThreshold(tcaMessageStatusEntity, processorContext);\r
114         // add processor status and messages\r
115         addMessageProcessorMessages(tcaMessageStatusEntity, processorContext);\r
116         // add Alert message\r
117         tcaMessageStatusEntity.setAlertMessage(\r
118                 alertMessage == null ? null : StringEscapeUtils.unescapeJson(alertMessage)\r
119         );\r
120 \r
121         messageStatusTable.write(rowKey, tcaMessageStatusEntity);\r
122 \r
123         LOG.debug("Finished persisting VES Status Message with rowKey: {} in Message Status Table.", rowKey);\r
124 \r
125     }\r
126 \r
127 \r
128     /**\r
129      * Create TCA VES Message Status Table Properties\r
130      *\r
131      * @param timeToLiveSeconds Message Status Table time to live in seconds\r
132      *\r
133      * @return Message Status table properties\r
134      */\r
135     public static DatasetProperties getDatasetProperties(final int timeToLiveSeconds) {\r
136 \r
137         try {\r
138             return ObjectMappedTableProperties.builder()\r
139                     .setType(TCAMessageStatusEntity.class)\r
140                     .setRowKeyExploreName(TABLE_ROW_KEY_COLUMN_NAME)\r
141                     .setRowKeyExploreType(Schema.Type.STRING)\r
142                     .add(IndexedTable.PROPERTY_TTL, timeToLiveSeconds)\r
143                     .setDescription(CDAPComponentsConstants.TCA_FIXED_VES_MESSAGE_STATUS_DESCRIPTION_TABLE)\r
144                     .build();\r
145         } catch (UnsupportedTypeException e) {\r
146             final String errorMessage = "Unable to convert TCAMessageStatusEntity class to Schema";\r
147             throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
148         }\r
149 \r
150     }\r
151 \r
152 \r
153     /**\r
154      * Adds Violated Threshold Parameter values to {@link TCAMessageStatusEntity}\r
155      *\r
156      * @param tcaMessageStatusEntity message entity that needs to be populated with threshold fields\r
157      * @param processorContext processor context\r
158      *\r
159      */\r
160     private static void addViolatedThreshold(final TCAMessageStatusEntity tcaMessageStatusEntity,\r
161                                                                final TCACEFProcessorContext processorContext) {\r
162 \r
163         final MetricsPerEventName metricsPerEventName = processorContext.getMetricsPerEventName();\r
164 \r
165         if (metricsPerEventName != null\r
166                 && metricsPerEventName.getThresholds() != null\r
167                 && metricsPerEventName.getThresholds().get(0) != null) {\r
168 \r
169             final Threshold threshold = metricsPerEventName.getThresholds().get(0);\r
170             tcaMessageStatusEntity.setThresholdPath(threshold.getFieldPath());\r
171             tcaMessageStatusEntity.setThresholdSeverity(threshold.getSeverity().name());\r
172             tcaMessageStatusEntity.setThresholdDirection(threshold.getDirection().name());\r
173             tcaMessageStatusEntity.setThresholdValue(threshold.getThresholdValue());\r
174         }\r
175 \r
176     }\r
177 \r
178 \r
179     /**\r
180      * Add TCA CEF Message Processor status information\r
181      *\r
182      * @param tcaMessageStatusEntity message entity that needs to be populated with message processor fields\r
183      * @param processorContext processor context\r
184      *\r
185      */\r
186     @SuppressWarnings("unchecked")\r
187     private static void addMessageProcessorMessages(\r
188             final TCAMessageStatusEntity tcaMessageStatusEntity, final TCACEFProcessorContext processorContext) {\r
189         final List<? super MessageProcessor<? extends ProcessorContext>> messageProcessors = processorContext\r
190                 .getMessageProcessors();\r
191 \r
192         if (messageProcessors != null && !messageProcessors.isEmpty()) {\r
193             for (Object messageProcessor : messageProcessors) {\r
194                 final MessageProcessor<TCACEFProcessorContext> tcaMessageProcessor =\r
195                         (MessageProcessor<TCACEFProcessorContext>) messageProcessor;\r
196 \r
197                 final String processingState = tcaMessageProcessor.getProcessingState().name();\r
198                 final String processingMessage = tcaMessageProcessor.getProcessingMessage().orNull();\r
199 \r
200                 if (messageProcessor.getClass().equals(TCACEFJsonProcessor.class)) {\r
201                     tcaMessageStatusEntity.setJsonProcessorStatus(processingState);\r
202                     tcaMessageStatusEntity.setJsonProcessorMessage(processingMessage);\r
203                 }\r
204 \r
205                 if (messageProcessor.getClass().equals(TCACEFPolicyDomainFilter.class)) {\r
206                     tcaMessageStatusEntity.setDomainFilterStatus(processingState);\r
207                     tcaMessageStatusEntity.setDomainFilterMessage(processingMessage);\r
208                 }\r
209 \r
210                 if (messageProcessor.getClass().equals(TCACEFPolicyEventNameFilter.class)) {\r
211                     tcaMessageStatusEntity.setEventNameFilterStatus(processingState);\r
212                     tcaMessageStatusEntity.setEventNameFilterMessage(processingMessage);\r
213                 }\r
214 \r
215                 if (messageProcessor.getClass().equals(TCACEFPolicyThresholdsProcessor.class)) {\r
216                     tcaMessageStatusEntity.setThresholdCalculatorStatus(processingState);\r
217                     tcaMessageStatusEntity.setThresholdCalculatorMessage(processingMessage);\r
218                 }\r
219 \r
220             }\r
221         }\r
222     }\r
223 \r
224     /**\r
225      * Creates Row Key for TCA VES Message Status table\r
226      *\r
227      * Row Key = (Message Type + Decreasing Value)\r
228      *\r
229      * @param calculatorMessageType calculator message type\r
230      *\r
231      * @return row key string\r
232      */\r
233     private static String createKey(final TCACalculatorMessageType calculatorMessageType) {\r
234 \r
235         final List<String> keyList = new LinkedList<>();\r
236         keyList.add(calculatorMessageType.name());\r
237         keyList.add(PersistenceUtils.getCurrentTimeReverseSubKey());\r
238         return Joiner.on(PersistenceUtils.ROW_KEY_DELIMITER).join(keyList);\r
239     }\r
240 \r
241 }\r