2 * ===============================LICENSE_START======================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2019 Samsung. All rights reserved.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============================LICENSE_END===========================================
22 package org.onap.dcae.apod.analytics.cdap.common.persistance.tca;
24 import co.cask.cdap.api.data.schema.Schema;
25 import co.cask.cdap.api.data.schema.UnsupportedTypeException;
26 import co.cask.cdap.api.dataset.DatasetProperties;
27 import co.cask.cdap.api.dataset.lib.IndexedTable;
28 import co.cask.cdap.api.dataset.lib.ObjectMappedTable;
29 import co.cask.cdap.api.dataset.lib.ObjectMappedTableProperties;
30 import com.google.common.base.Joiner;
31 import org.apache.commons.lang3.StringEscapeUtils;
32 import org.apache.commons.lang3.tuple.Pair;
33 import org.onap.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
34 import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
35 import org.onap.dcae.apod.analytics.common.service.processor.MessageProcessor;
36 import org.onap.dcae.apod.analytics.common.service.processor.ProcessorContext;
37 import org.onap.dcae.apod.analytics.common.utils.PersistenceUtils;
38 import org.onap.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;
39 import org.onap.dcae.apod.analytics.model.domain.policy.tca.Threshold;
40 import org.onap.dcae.apod.analytics.tca.processor.TCACEFJsonProcessor;
41 import org.onap.dcae.apod.analytics.tca.processor.TCACEFPolicyDomainFilter;
42 import org.onap.dcae.apod.analytics.tca.processor.TCACEFPolicyEventNameFilter;
43 import org.onap.dcae.apod.analytics.tca.processor.TCACEFPolicyThresholdsProcessor;
44 import org.onap.dcae.apod.analytics.tca.processor.TCACEFProcessorContext;
45 import org.onap.dcae.apod.analytics.tca.utils.TCAUtils;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
49 import java.util.Date;
50 import java.util.LinkedList;
51 import java.util.List;
53 import javax.annotation.Nullable;
55 import static org.onap.dcae.apod.analytics.common.utils.PersistenceUtils.TABLE_ROW_KEY_COLUMN_NAME;
60 * @author Rajiv Singla . Creation Date: 11/15/2016.
62 public abstract class TCAMessageStatusPersister {
64 private static final Logger LOG = LoggerFactory.getLogger(TCAMessageStatusPersister.class);
66 private TCAMessageStatusPersister() {
71 * Saves Message Status in Table. Assumes no alert was generated
73 * @param processorContext processor Context
74 * @param instanceId Instance Id
75 * @param calculatorMessageType Calculation Message Type
76 * @param messageStatusTable Message Status Table
78 public static void persist(final TCACEFProcessorContext processorContext,
80 final TCACalculatorMessageType calculatorMessageType,
81 final ObjectMappedTable<TCAMessageStatusEntity> messageStatusTable) {
82 persist(processorContext, instanceId, calculatorMessageType, messageStatusTable, null);
86 * Saves Message Status in Table. Sets up alert message aslo
88 * @param processorContext processor Context
89 * @param instanceId Instance Id
90 * @param calculatorMessageType Calculation Message Type
91 * @param messageStatusTable Message Status Table
92 * @param alertMessage Alert message
94 public static void persist(final TCACEFProcessorContext processorContext,
96 final TCACalculatorMessageType calculatorMessageType,
97 final ObjectMappedTable<TCAMessageStatusEntity> messageStatusTable,
98 @Nullable final String alertMessage) {
100 final String rowKey = createKey(calculatorMessageType);
102 final Long currentTS = new Date().getTime();
103 final String vesMessage = StringEscapeUtils.unescapeJson(processorContext.getMessage());
105 // Find Functional Role and domain
106 final Pair<String, String> domainAndEventName = TCAUtils.getDomainAndEventName(processorContext);
107 final String domain = domainAndEventName.getLeft();
108 final String eventName = domainAndEventName.getRight();
110 final TCAMessageStatusEntity tcaMessageStatusEntity = new TCAMessageStatusEntity.TCAMessageStatusEntityBuilder()
111 .setCreationTS(currentTS).setInstanceId(instanceId).setMessageType(calculatorMessageType.name())
112 .setVesMessage(vesMessage).setDomain(domain).setEventName(eventName).createTCAMessageStatusEntity();
114 // add threshold violation fields
115 addViolatedThreshold(tcaMessageStatusEntity, processorContext);
116 // add processor status and messages
117 addMessageProcessorMessages(tcaMessageStatusEntity, processorContext);
119 tcaMessageStatusEntity.setAlertMessage(
120 alertMessage == null ? null : StringEscapeUtils.unescapeJson(alertMessage)
123 messageStatusTable.write(rowKey, tcaMessageStatusEntity);
125 LOG.debug("Finished persisting VES Status Message with rowKey: {} in Message Status Table.", rowKey);
131 * Create TCA VES Message Status Table Properties
133 * @param timeToLiveSeconds Message Status Table time to live in seconds
135 * @return Message Status table properties
137 public static DatasetProperties getDatasetProperties(final int timeToLiveSeconds) {
140 return ObjectMappedTableProperties.builder()
141 .setType(TCAMessageStatusEntity.class)
142 .setRowKeyExploreName(TABLE_ROW_KEY_COLUMN_NAME)
143 .setRowKeyExploreType(Schema.Type.STRING)
144 .add(IndexedTable.PROPERTY_TTL, timeToLiveSeconds)
145 .setDescription(CDAPComponentsConstants.TCA_FIXED_VES_MESSAGE_STATUS_DESCRIPTION_TABLE)
147 } catch (UnsupportedTypeException e) {
148 final String errorMessage = "Unable to convert TCAMessageStatusEntity class to Schema";
149 throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
156 * Adds Violated Threshold Parameter values to {@link TCAMessageStatusEntity}
158 * @param tcaMessageStatusEntity message entity that needs to be populated with threshold fields
159 * @param processorContext processor context
162 private static void addViolatedThreshold(final TCAMessageStatusEntity tcaMessageStatusEntity,
163 final TCACEFProcessorContext processorContext) {
165 final MetricsPerEventName metricsPerEventName = processorContext.getMetricsPerEventName();
167 if (metricsPerEventName != null
168 && metricsPerEventName.getThresholds() != null
169 && metricsPerEventName.getThresholds().get(0) != null) {
171 final Threshold threshold = metricsPerEventName.getThresholds().get(0);
172 tcaMessageStatusEntity.setThresholdPath(threshold.getFieldPath());
173 tcaMessageStatusEntity.setThresholdSeverity(threshold.getSeverity().name());
174 tcaMessageStatusEntity.setThresholdDirection(threshold.getDirection().name());
175 tcaMessageStatusEntity.setThresholdValue(threshold.getThresholdValue());
182 * Add TCA CEF Message Processor status information
184 * @param tcaMessageStatusEntity message entity that needs to be populated with message processor fields
185 * @param processorContext processor context
188 @SuppressWarnings("unchecked")
189 private static void addMessageProcessorMessages(
190 final TCAMessageStatusEntity tcaMessageStatusEntity, final TCACEFProcessorContext processorContext) {
191 final List<? super MessageProcessor<? extends ProcessorContext>> messageProcessors = processorContext
192 .getMessageProcessors();
194 if (messageProcessors != null && !messageProcessors.isEmpty()) {
195 for (Object messageProcessor : messageProcessors) {
196 final MessageProcessor<TCACEFProcessorContext> tcaMessageProcessor =
197 (MessageProcessor<TCACEFProcessorContext>) messageProcessor;
199 final String processingState = tcaMessageProcessor.getProcessingState().name();
200 final String processingMessage = tcaMessageProcessor.getProcessingMessage().orNull();
202 if (messageProcessor.getClass().equals(TCACEFJsonProcessor.class)) {
203 tcaMessageStatusEntity.setJsonProcessorStatus(processingState);
204 tcaMessageStatusEntity.setJsonProcessorMessage(processingMessage);
207 if (messageProcessor.getClass().equals(TCACEFPolicyDomainFilter.class)) {
208 tcaMessageStatusEntity.setDomainFilterStatus(processingState);
209 tcaMessageStatusEntity.setDomainFilterMessage(processingMessage);
212 if (messageProcessor.getClass().equals(TCACEFPolicyEventNameFilter.class)) {
213 tcaMessageStatusEntity.setEventNameFilterStatus(processingState);
214 tcaMessageStatusEntity.setEventNameFilterMessage(processingMessage);
217 if (messageProcessor.getClass().equals(TCACEFPolicyThresholdsProcessor.class)) {
218 tcaMessageStatusEntity.setThresholdCalculatorStatus(processingState);
219 tcaMessageStatusEntity.setThresholdCalculatorMessage(processingMessage);
227 * Creates Row Key for TCA VES Message Status table
229 * Row Key = (Message Type + Decreasing Value)
231 * @param calculatorMessageType calculator message type
233 * @return row key string
235 private static String createKey(final TCACalculatorMessageType calculatorMessageType) {
237 final List<String> keyList = new LinkedList<>();
238 keyList.add(calculatorMessageType.name());
239 keyList.add(PersistenceUtils.getCurrentTimeReverseSubKey());
240 return Joiner.on(PersistenceUtils.ROW_KEY_DELIMITER).join(keyList);