2 * ===============================LICENSE_START======================================
\r
4 * ================================================================================
\r
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
\r
6 * ================================================================================
\r
7 * Licensed under the Apache License, Version 2.0 (the "License");
\r
8 * you may not use this file except in compliance with the License.
\r
9 * You may obtain a copy of the License at
\r
11 * http://www.apache.org/licenses/LICENSE-2.0
\r
13 * Unless required by applicable law or agreed to in writing, software
\r
14 * distributed under the License is distributed on an "AS IS" BASIS,
\r
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
16 * See the License for the specific language governing permissions and
\r
17 * limitations under the License.
\r
18 * ============================LICENSE_END===========================================
\r
21 package org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca;
\r
23 import co.cask.cdap.api.data.schema.Schema;
\r
24 import co.cask.cdap.api.data.schema.UnsupportedTypeException;
\r
25 import co.cask.cdap.api.dataset.DatasetProperties;
\r
26 import co.cask.cdap.api.dataset.lib.IndexedTable;
\r
27 import co.cask.cdap.api.dataset.lib.ObjectMappedTable;
\r
28 import co.cask.cdap.api.dataset.lib.ObjectMappedTableProperties;
\r
29 import com.google.common.base.Joiner;
\r
30 import org.apache.commons.lang3.StringEscapeUtils;
\r
31 import org.apache.commons.lang3.tuple.Pair;
\r
32 import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
\r
33 import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
\r
34 import org.openecomp.dcae.apod.analytics.common.service.processor.MessageProcessor;
\r
35 import org.openecomp.dcae.apod.analytics.common.service.processor.ProcessorContext;
\r
36 import org.openecomp.dcae.apod.analytics.common.utils.PersistenceUtils;
\r
37 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;
\r
38 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold;
\r
39 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFJsonProcessor;
\r
40 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyDomainFilter;
\r
41 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyEventNameFilter;
\r
42 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyThresholdsProcessor;
\r
43 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFProcessorContext;
\r
44 import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils;
\r
45 import org.slf4j.Logger;
\r
46 import org.slf4j.LoggerFactory;
\r
48 import java.util.Date;
\r
49 import java.util.LinkedList;
\r
50 import java.util.List;
\r
52 import javax.annotation.Nullable;
\r
54 import static org.openecomp.dcae.apod.analytics.common.utils.PersistenceUtils.TABLE_ROW_KEY_COLUMN_NAME;
\r
59 * @author Rajiv Singla . Creation Date: 11/15/2016.
\r
61 public abstract class TCAMessageStatusPersister {
\r
63 private static final Logger LOG = LoggerFactory.getLogger(TCAMessageStatusPersister.class);
\r
65 private TCAMessageStatusPersister() {
\r
70 * Saves Message Status in Table. Assumes no alert was generated
\r
72 * @param processorContext processor Context
\r
73 * @param instanceId Instance Id
\r
74 * @param calculatorMessageType Calculation Message Type
\r
75 * @param messageStatusTable Message Status Table
\r
77 public static void persist(final TCACEFProcessorContext processorContext,
\r
78 final int instanceId,
\r
79 final TCACalculatorMessageType calculatorMessageType,
\r
80 final ObjectMappedTable<TCAMessageStatusEntity> messageStatusTable) {
\r
81 persist(processorContext, instanceId, calculatorMessageType, messageStatusTable, null);
\r
85 * Saves Message Status in Table. Sets up alert message aslo
\r
87 * @param processorContext processor Context
\r
88 * @param instanceId Instance Id
\r
89 * @param calculatorMessageType Calculation Message Type
\r
90 * @param messageStatusTable Message Status Table
\r
91 * @param alertMessage Alert message
\r
93 public static void persist(final TCACEFProcessorContext processorContext,
\r
94 final int instanceId,
\r
95 final TCACalculatorMessageType calculatorMessageType,
\r
96 final ObjectMappedTable<TCAMessageStatusEntity> messageStatusTable,
\r
97 @Nullable final String alertMessage) {
\r
99 final String rowKey = createKey(calculatorMessageType);
\r
101 final Long currentTS = new Date().getTime();
\r
102 final String vesMessage = StringEscapeUtils.unescapeJson(processorContext.getMessage());
\r
104 // Find Functional Role and domain
\r
105 final Pair<String, String> domainAndEventName = TCAUtils.getDomainAndEventName(processorContext);
\r
106 final String domain = domainAndEventName.getLeft();
\r
107 final String eventName = domainAndEventName.getRight();
\r
109 final TCAMessageStatusEntity tcaMessageStatusEntity = new TCAMessageStatusEntity(currentTS,
\r
110 instanceId, calculatorMessageType.name(), vesMessage, domain, eventName);
\r
112 // add threshold violation fields
\r
113 addViolatedThreshold(tcaMessageStatusEntity, processorContext);
\r
114 // add processor status and messages
\r
115 addMessageProcessorMessages(tcaMessageStatusEntity, processorContext);
\r
116 // add Alert message
\r
117 tcaMessageStatusEntity.setAlertMessage(
\r
118 alertMessage == null ? null : StringEscapeUtils.unescapeJson(alertMessage)
\r
121 messageStatusTable.write(rowKey, tcaMessageStatusEntity);
\r
123 LOG.debug("Finished persisting VES Status Message with rowKey: {} in Message Status Table.", rowKey);
\r
129 * Create TCA VES Message Status Table Properties
\r
131 * @param timeToLiveSeconds Message Status Table time to live in seconds
\r
133 * @return Message Status table properties
\r
135 public static DatasetProperties getDatasetProperties(final int timeToLiveSeconds) {
\r
138 return ObjectMappedTableProperties.builder()
\r
139 .setType(TCAMessageStatusEntity.class)
\r
140 .setRowKeyExploreName(TABLE_ROW_KEY_COLUMN_NAME)
\r
141 .setRowKeyExploreType(Schema.Type.STRING)
\r
142 .add(IndexedTable.PROPERTY_TTL, timeToLiveSeconds)
\r
143 .setDescription(CDAPComponentsConstants.TCA_FIXED_VES_MESSAGE_STATUS_DESCRIPTION_TABLE)
\r
145 } catch (UnsupportedTypeException e) {
\r
146 final String errorMessage = "Unable to convert TCAMessageStatusEntity class to Schema";
\r
147 throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
\r
154 * Adds Violated Threshold Parameter values to {@link TCAMessageStatusEntity}
\r
156 * @param tcaMessageStatusEntity message entity that needs to be populated with threshold fields
\r
157 * @param processorContext processor context
\r
160 private static void addViolatedThreshold(final TCAMessageStatusEntity tcaMessageStatusEntity,
\r
161 final TCACEFProcessorContext processorContext) {
\r
163 final MetricsPerEventName metricsPerEventName = processorContext.getMetricsPerEventName();
\r
165 if (metricsPerEventName != null
\r
166 && metricsPerEventName.getThresholds() != null
\r
167 && metricsPerEventName.getThresholds().get(0) != null) {
\r
169 final Threshold threshold = metricsPerEventName.getThresholds().get(0);
\r
170 tcaMessageStatusEntity.setThresholdPath(threshold.getFieldPath());
\r
171 tcaMessageStatusEntity.setThresholdSeverity(threshold.getSeverity().name());
\r
172 tcaMessageStatusEntity.setThresholdDirection(threshold.getDirection().name());
\r
173 tcaMessageStatusEntity.setThresholdValue(threshold.getThresholdValue());
\r
180 * Add TCA CEF Message Processor status information
\r
182 * @param tcaMessageStatusEntity message entity that needs to be populated with message processor fields
\r
183 * @param processorContext processor context
\r
186 @SuppressWarnings("unchecked")
\r
187 private static void addMessageProcessorMessages(
\r
188 final TCAMessageStatusEntity tcaMessageStatusEntity, final TCACEFProcessorContext processorContext) {
\r
189 final List<? super MessageProcessor<? extends ProcessorContext>> messageProcessors = processorContext
\r
190 .getMessageProcessors();
\r
192 if (messageProcessors != null && !messageProcessors.isEmpty()) {
\r
193 for (Object messageProcessor : messageProcessors) {
\r
194 final MessageProcessor<TCACEFProcessorContext> tcaMessageProcessor =
\r
195 (MessageProcessor<TCACEFProcessorContext>) messageProcessor;
\r
197 final String processingState = tcaMessageProcessor.getProcessingState().name();
\r
198 final String processingMessage = tcaMessageProcessor.getProcessingMessage().orNull();
\r
200 if (messageProcessor.getClass().equals(TCACEFJsonProcessor.class)) {
\r
201 tcaMessageStatusEntity.setJsonProcessorStatus(processingState);
\r
202 tcaMessageStatusEntity.setJsonProcessorMessage(processingMessage);
\r
205 if (messageProcessor.getClass().equals(TCACEFPolicyDomainFilter.class)) {
\r
206 tcaMessageStatusEntity.setDomainFilterStatus(processingState);
\r
207 tcaMessageStatusEntity.setDomainFilterMessage(processingMessage);
\r
210 if (messageProcessor.getClass().equals(TCACEFPolicyEventNameFilter.class)) {
\r
211 tcaMessageStatusEntity.setEventNameFilterStatus(processingState);
\r
212 tcaMessageStatusEntity.setEventNameFilterMessage(processingMessage);
\r
215 if (messageProcessor.getClass().equals(TCACEFPolicyThresholdsProcessor.class)) {
\r
216 tcaMessageStatusEntity.setThresholdCalculatorStatus(processingState);
\r
217 tcaMessageStatusEntity.setThresholdCalculatorMessage(processingMessage);
\r
225 * Creates Row Key for TCA VES Message Status table
\r
227 * Row Key = (Message Type + Decreasing Value)
\r
229 * @param calculatorMessageType calculator message type
\r
231 * @return row key string
\r
233 private static String createKey(final TCACalculatorMessageType calculatorMessageType) {
\r
235 final List<String> keyList = new LinkedList<>();
\r
236 keyList.add(calculatorMessageType.name());
\r
237 keyList.add(PersistenceUtils.getCurrentTimeReverseSubKey());
\r
238 return Joiner.on(PersistenceUtils.ROW_KEY_DELIMITER).join(keyList);
\r