89d8d00fd600276cf5c7fbe762d5b02dc845b0ea
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-common / src / main / java / org / onap / dcae / apod / analytics / cdap / common / persistance / tca / TCAMessageStatusPersister.java
1 /*
2  * ===============================LICENSE_START======================================
3  *  dcae-analytics
4  * ================================================================================
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *   You may obtain a copy of the License at
10  *
11  *          http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  *  ============================LICENSE_END===========================================
19  */
20
21 package org.onap.dcae.apod.analytics.cdap.common.persistance.tca;
22
23 import co.cask.cdap.api.data.schema.Schema;
24 import co.cask.cdap.api.data.schema.UnsupportedTypeException;
25 import co.cask.cdap.api.dataset.DatasetProperties;
26 import co.cask.cdap.api.dataset.lib.IndexedTable;
27 import co.cask.cdap.api.dataset.lib.ObjectMappedTable;
28 import co.cask.cdap.api.dataset.lib.ObjectMappedTableProperties;
29 import com.google.common.base.Joiner;
30 import org.apache.commons.lang3.StringEscapeUtils;
31 import org.apache.commons.lang3.tuple.Pair;
32 import org.onap.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
33 import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
34 import org.onap.dcae.apod.analytics.common.service.processor.MessageProcessor;
35 import org.onap.dcae.apod.analytics.common.service.processor.ProcessorContext;
36 import org.onap.dcae.apod.analytics.common.utils.PersistenceUtils;
37 import org.onap.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;
38 import org.onap.dcae.apod.analytics.model.domain.policy.tca.Threshold;
39 import org.onap.dcae.apod.analytics.tca.processor.TCACEFJsonProcessor;
40 import org.onap.dcae.apod.analytics.tca.processor.TCACEFPolicyDomainFilter;
41 import org.onap.dcae.apod.analytics.tca.processor.TCACEFPolicyEventNameFilter;
42 import org.onap.dcae.apod.analytics.tca.processor.TCACEFPolicyThresholdsProcessor;
43 import org.onap.dcae.apod.analytics.tca.processor.TCACEFProcessorContext;
44 import org.onap.dcae.apod.analytics.tca.utils.TCAUtils;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 import java.util.Date;
49 import java.util.LinkedList;
50 import java.util.List;
51
52 import javax.annotation.Nullable;
53
54 import static org.onap.dcae.apod.analytics.common.utils.PersistenceUtils.TABLE_ROW_KEY_COLUMN_NAME;
55
56 /**
57  *
58  *
59  * @author Rajiv Singla . Creation Date: 11/15/2016.
60  */
61 public abstract class TCAMessageStatusPersister {
62
63     private static final Logger LOG = LoggerFactory.getLogger(TCAMessageStatusPersister.class);
64
65     private TCAMessageStatusPersister() {
66
67     }
68
69     /**
70      * Saves Message Status in Table. Assumes no alert was generated
71      *
72      * @param processorContext processor Context
73      * @param instanceId Instance Id
74      * @param calculatorMessageType Calculation Message Type
75      * @param messageStatusTable Message Status Table
76      */
77     public static void persist(final TCACEFProcessorContext processorContext,
78                                final int instanceId,
79                                final TCACalculatorMessageType calculatorMessageType,
80                                final ObjectMappedTable<TCAMessageStatusEntity> messageStatusTable) {
81         persist(processorContext, instanceId, calculatorMessageType, messageStatusTable, null);
82     }
83
84     /**
85      * Saves Message Status in Table. Sets up alert message aslo
86      *
87      * @param processorContext processor Context
88      * @param instanceId Instance Id
89      * @param calculatorMessageType Calculation Message Type
90      * @param messageStatusTable Message Status Table
91      * @param alertMessage Alert message
92      */
93     public static void persist(final TCACEFProcessorContext processorContext,
94                                final int instanceId,
95                                final TCACalculatorMessageType calculatorMessageType,
96                                final ObjectMappedTable<TCAMessageStatusEntity> messageStatusTable,
97                                @Nullable final String alertMessage) {
98
99         final String rowKey = createKey(calculatorMessageType);
100
101         final Long currentTS = new Date().getTime();
102         final String vesMessage = StringEscapeUtils.unescapeJson(processorContext.getMessage());
103
104         // Find Functional Role and domain
105         final Pair<String, String> domainAndEventName = TCAUtils.getDomainAndEventName(processorContext);
106         final String domain = domainAndEventName.getLeft();
107         final String eventName = domainAndEventName.getRight();
108
109         final TCAMessageStatusEntity tcaMessageStatusEntity = new TCAMessageStatusEntity(currentTS,
110                 instanceId, calculatorMessageType.name(), vesMessage, domain, eventName);
111
112         // add threshold violation fields
113         addViolatedThreshold(tcaMessageStatusEntity, processorContext);
114         // add processor status and messages
115         addMessageProcessorMessages(tcaMessageStatusEntity, processorContext);
116         // add Alert message
117         tcaMessageStatusEntity.setAlertMessage(
118                 alertMessage == null ? null : StringEscapeUtils.unescapeJson(alertMessage)
119         );
120
121         messageStatusTable.write(rowKey, tcaMessageStatusEntity);
122
123         LOG.debug("Finished persisting VES Status Message with rowKey: {} in Message Status Table.", rowKey);
124
125     }
126
127
128     /**
129      * Create TCA VES Message Status Table Properties
130      *
131      * @param timeToLiveSeconds Message Status Table time to live in seconds
132      *
133      * @return Message Status table properties
134      */
135     public static DatasetProperties getDatasetProperties(final int timeToLiveSeconds) {
136
137         try {
138             return ObjectMappedTableProperties.builder()
139                     .setType(TCAMessageStatusEntity.class)
140                     .setRowKeyExploreName(TABLE_ROW_KEY_COLUMN_NAME)
141                     .setRowKeyExploreType(Schema.Type.STRING)
142                     .add(IndexedTable.PROPERTY_TTL, timeToLiveSeconds)
143                     .setDescription(CDAPComponentsConstants.TCA_FIXED_VES_MESSAGE_STATUS_DESCRIPTION_TABLE)
144                     .build();
145         } catch (UnsupportedTypeException e) {
146             final String errorMessage = "Unable to convert TCAMessageStatusEntity class to Schema";
147             throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
148         }
149
150     }
151
152
153     /**
154      * Adds Violated Threshold Parameter values to {@link TCAMessageStatusEntity}
155      *
156      * @param tcaMessageStatusEntity message entity that needs to be populated with threshold fields
157      * @param processorContext processor context
158      *
159      */
160     private static void addViolatedThreshold(final TCAMessageStatusEntity tcaMessageStatusEntity,
161                                                                final TCACEFProcessorContext processorContext) {
162
163         final MetricsPerEventName metricsPerEventName = processorContext.getMetricsPerEventName();
164
165         if (metricsPerEventName != null
166                 && metricsPerEventName.getThresholds() != null
167                 && metricsPerEventName.getThresholds().get(0) != null) {
168
169             final Threshold threshold = metricsPerEventName.getThresholds().get(0);
170             tcaMessageStatusEntity.setThresholdPath(threshold.getFieldPath());
171             tcaMessageStatusEntity.setThresholdSeverity(threshold.getSeverity().name());
172             tcaMessageStatusEntity.setThresholdDirection(threshold.getDirection().name());
173             tcaMessageStatusEntity.setThresholdValue(threshold.getThresholdValue());
174         }
175
176     }
177
178
179     /**
180      * Add TCA CEF Message Processor status information
181      *
182      * @param tcaMessageStatusEntity message entity that needs to be populated with message processor fields
183      * @param processorContext processor context
184      *
185      */
186     @SuppressWarnings("unchecked")
187     private static void addMessageProcessorMessages(
188             final TCAMessageStatusEntity tcaMessageStatusEntity, final TCACEFProcessorContext processorContext) {
189         final List<? super MessageProcessor<? extends ProcessorContext>> messageProcessors = processorContext
190                 .getMessageProcessors();
191
192         if (messageProcessors != null && !messageProcessors.isEmpty()) {
193             for (Object messageProcessor : messageProcessors) {
194                 final MessageProcessor<TCACEFProcessorContext> tcaMessageProcessor =
195                         (MessageProcessor<TCACEFProcessorContext>) messageProcessor;
196
197                 final String processingState = tcaMessageProcessor.getProcessingState().name();
198                 final String processingMessage = tcaMessageProcessor.getProcessingMessage().orNull();
199
200                 if (messageProcessor.getClass().equals(TCACEFJsonProcessor.class)) {
201                     tcaMessageStatusEntity.setJsonProcessorStatus(processingState);
202                     tcaMessageStatusEntity.setJsonProcessorMessage(processingMessage);
203                 }
204
205                 if (messageProcessor.getClass().equals(TCACEFPolicyDomainFilter.class)) {
206                     tcaMessageStatusEntity.setDomainFilterStatus(processingState);
207                     tcaMessageStatusEntity.setDomainFilterMessage(processingMessage);
208                 }
209
210                 if (messageProcessor.getClass().equals(TCACEFPolicyEventNameFilter.class)) {
211                     tcaMessageStatusEntity.setEventNameFilterStatus(processingState);
212                     tcaMessageStatusEntity.setEventNameFilterMessage(processingMessage);
213                 }
214
215                 if (messageProcessor.getClass().equals(TCACEFPolicyThresholdsProcessor.class)) {
216                     tcaMessageStatusEntity.setThresholdCalculatorStatus(processingState);
217                     tcaMessageStatusEntity.setThresholdCalculatorMessage(processingMessage);
218                 }
219
220             }
221         }
222     }
223
224     /**
225      * Creates Row Key for TCA VES Message Status table
226      *
227      * Row Key = (Message Type + Decreasing Value)
228      *
229      * @param calculatorMessageType calculator message type
230      *
231      * @return row key string
232      */
233     private static String createKey(final TCACalculatorMessageType calculatorMessageType) {
234
235         final List<String> keyList = new LinkedList<>();
236         keyList.add(calculatorMessageType.name());
237         keyList.add(PersistenceUtils.getCurrentTimeReverseSubKey());
238         return Joiner.on(PersistenceUtils.ROW_KEY_DELIMITER).join(keyList);
239     }
240
241 }