2 * ===============================LICENSE_START======================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============================LICENSE_END===========================================
21 package org.onap.dcae.apod.analytics.cdap.common.persistance.tca;
23 import co.cask.cdap.api.data.schema.Schema;
24 import co.cask.cdap.api.data.schema.UnsupportedTypeException;
25 import co.cask.cdap.api.dataset.DatasetProperties;
26 import co.cask.cdap.api.dataset.lib.IndexedTable;
27 import co.cask.cdap.api.dataset.lib.ObjectMappedTable;
28 import co.cask.cdap.api.dataset.lib.ObjectMappedTableProperties;
29 import com.google.common.base.Joiner;
30 import org.apache.commons.lang3.StringEscapeUtils;
31 import org.apache.commons.lang3.tuple.Pair;
32 import org.onap.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
33 import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
34 import org.onap.dcae.apod.analytics.common.service.processor.MessageProcessor;
35 import org.onap.dcae.apod.analytics.common.service.processor.ProcessorContext;
36 import org.onap.dcae.apod.analytics.common.utils.PersistenceUtils;
37 import org.onap.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;
38 import org.onap.dcae.apod.analytics.model.domain.policy.tca.Threshold;
39 import org.onap.dcae.apod.analytics.tca.processor.TCACEFJsonProcessor;
40 import org.onap.dcae.apod.analytics.tca.processor.TCACEFPolicyDomainFilter;
41 import org.onap.dcae.apod.analytics.tca.processor.TCACEFPolicyEventNameFilter;
42 import org.onap.dcae.apod.analytics.tca.processor.TCACEFPolicyThresholdsProcessor;
43 import org.onap.dcae.apod.analytics.tca.processor.TCACEFProcessorContext;
44 import org.onap.dcae.apod.analytics.tca.utils.TCAUtils;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
48 import java.util.Date;
49 import java.util.LinkedList;
50 import java.util.List;
52 import javax.annotation.Nullable;
54 import static org.onap.dcae.apod.analytics.common.utils.PersistenceUtils.TABLE_ROW_KEY_COLUMN_NAME;
59 * @author Rajiv Singla . Creation Date: 11/15/2016.
61 public abstract class TCAMessageStatusPersister {
63 private static final Logger LOG = LoggerFactory.getLogger(TCAMessageStatusPersister.class);
65 private TCAMessageStatusPersister() {
70 * Saves Message Status in Table. Assumes no alert was generated
72 * @param processorContext processor Context
73 * @param instanceId Instance Id
74 * @param calculatorMessageType Calculation Message Type
75 * @param messageStatusTable Message Status Table
77 public static void persist(final TCACEFProcessorContext processorContext,
79 final TCACalculatorMessageType calculatorMessageType,
80 final ObjectMappedTable<TCAMessageStatusEntity> messageStatusTable) {
81 persist(processorContext, instanceId, calculatorMessageType, messageStatusTable, null);
85 * Saves Message Status in Table. Sets up alert message aslo
87 * @param processorContext processor Context
88 * @param instanceId Instance Id
89 * @param calculatorMessageType Calculation Message Type
90 * @param messageStatusTable Message Status Table
91 * @param alertMessage Alert message
93 public static void persist(final TCACEFProcessorContext processorContext,
95 final TCACalculatorMessageType calculatorMessageType,
96 final ObjectMappedTable<TCAMessageStatusEntity> messageStatusTable,
97 @Nullable final String alertMessage) {
99 final String rowKey = createKey(calculatorMessageType);
101 final Long currentTS = new Date().getTime();
102 final String vesMessage = StringEscapeUtils.unescapeJson(processorContext.getMessage());
104 // Find Functional Role and domain
105 final Pair<String, String> domainAndEventName = TCAUtils.getDomainAndEventName(processorContext);
106 final String domain = domainAndEventName.getLeft();
107 final String eventName = domainAndEventName.getRight();
109 final TCAMessageStatusEntity tcaMessageStatusEntity = new TCAMessageStatusEntity(currentTS,
110 instanceId, calculatorMessageType.name(), vesMessage, domain, eventName);
112 // add threshold violation fields
113 addViolatedThreshold(tcaMessageStatusEntity, processorContext);
114 // add processor status and messages
115 addMessageProcessorMessages(tcaMessageStatusEntity, processorContext);
117 tcaMessageStatusEntity.setAlertMessage(
118 alertMessage == null ? null : StringEscapeUtils.unescapeJson(alertMessage)
121 messageStatusTable.write(rowKey, tcaMessageStatusEntity);
123 LOG.debug("Finished persisting VES Status Message with rowKey: {} in Message Status Table.", rowKey);
129 * Create TCA VES Message Status Table Properties
131 * @param timeToLiveSeconds Message Status Table time to live in seconds
133 * @return Message Status table properties
135 public static DatasetProperties getDatasetProperties(final int timeToLiveSeconds) {
138 return ObjectMappedTableProperties.builder()
139 .setType(TCAMessageStatusEntity.class)
140 .setRowKeyExploreName(TABLE_ROW_KEY_COLUMN_NAME)
141 .setRowKeyExploreType(Schema.Type.STRING)
142 .add(IndexedTable.PROPERTY_TTL, timeToLiveSeconds)
143 .setDescription(CDAPComponentsConstants.TCA_FIXED_VES_MESSAGE_STATUS_DESCRIPTION_TABLE)
145 } catch (UnsupportedTypeException e) {
146 final String errorMessage = "Unable to convert TCAMessageStatusEntity class to Schema";
147 throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
154 * Adds Violated Threshold Parameter values to {@link TCAMessageStatusEntity}
156 * @param tcaMessageStatusEntity message entity that needs to be populated with threshold fields
157 * @param processorContext processor context
160 private static void addViolatedThreshold(final TCAMessageStatusEntity tcaMessageStatusEntity,
161 final TCACEFProcessorContext processorContext) {
163 final MetricsPerEventName metricsPerEventName = processorContext.getMetricsPerEventName();
165 if (metricsPerEventName != null
166 && metricsPerEventName.getThresholds() != null
167 && metricsPerEventName.getThresholds().get(0) != null) {
169 final Threshold threshold = metricsPerEventName.getThresholds().get(0);
170 tcaMessageStatusEntity.setThresholdPath(threshold.getFieldPath());
171 tcaMessageStatusEntity.setThresholdSeverity(threshold.getSeverity().name());
172 tcaMessageStatusEntity.setThresholdDirection(threshold.getDirection().name());
173 tcaMessageStatusEntity.setThresholdValue(threshold.getThresholdValue());
180 * Add TCA CEF Message Processor status information
182 * @param tcaMessageStatusEntity message entity that needs to be populated with message processor fields
183 * @param processorContext processor context
186 @SuppressWarnings("unchecked")
187 private static void addMessageProcessorMessages(
188 final TCAMessageStatusEntity tcaMessageStatusEntity, final TCACEFProcessorContext processorContext) {
189 final List<? super MessageProcessor<? extends ProcessorContext>> messageProcessors = processorContext
190 .getMessageProcessors();
192 if (messageProcessors != null && !messageProcessors.isEmpty()) {
193 for (Object messageProcessor : messageProcessors) {
194 final MessageProcessor<TCACEFProcessorContext> tcaMessageProcessor =
195 (MessageProcessor<TCACEFProcessorContext>) messageProcessor;
197 final String processingState = tcaMessageProcessor.getProcessingState().name();
198 final String processingMessage = tcaMessageProcessor.getProcessingMessage().orNull();
200 if (messageProcessor.getClass().equals(TCACEFJsonProcessor.class)) {
201 tcaMessageStatusEntity.setJsonProcessorStatus(processingState);
202 tcaMessageStatusEntity.setJsonProcessorMessage(processingMessage);
205 if (messageProcessor.getClass().equals(TCACEFPolicyDomainFilter.class)) {
206 tcaMessageStatusEntity.setDomainFilterStatus(processingState);
207 tcaMessageStatusEntity.setDomainFilterMessage(processingMessage);
210 if (messageProcessor.getClass().equals(TCACEFPolicyEventNameFilter.class)) {
211 tcaMessageStatusEntity.setEventNameFilterStatus(processingState);
212 tcaMessageStatusEntity.setEventNameFilterMessage(processingMessage);
215 if (messageProcessor.getClass().equals(TCACEFPolicyThresholdsProcessor.class)) {
216 tcaMessageStatusEntity.setThresholdCalculatorStatus(processingState);
217 tcaMessageStatusEntity.setThresholdCalculatorMessage(processingMessage);
225 * Creates Row Key for TCA VES Message Status table
227 * Row Key = (Message Type + Decreasing Value)
229 * @param calculatorMessageType calculator message type
231 * @return row key string
233 private static String createKey(final TCACalculatorMessageType calculatorMessageType) {
235 final List<String> keyList = new LinkedList<>();
236 keyList.add(calculatorMessageType.name());
237 keyList.add(PersistenceUtils.getCurrentTimeReverseSubKey());
238 return Joiner.on(PersistenceUtils.ROW_KEY_DELIMITER).join(keyList);