2 * ===============================LICENSE_START======================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============================LICENSE_END===========================================
21 package org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca;
23 import co.cask.cdap.api.data.schema.Schema;
24 import co.cask.cdap.api.data.schema.UnsupportedTypeException;
25 import co.cask.cdap.api.dataset.DatasetProperties;
26 import co.cask.cdap.api.dataset.lib.IndexedTable;
27 import co.cask.cdap.api.dataset.lib.ObjectMappedTable;
28 import co.cask.cdap.api.dataset.lib.ObjectMappedTableProperties;
29 import com.google.common.base.Joiner;
30 import org.apache.commons.lang3.StringEscapeUtils;
31 import org.apache.commons.lang3.tuple.Pair;
32 import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
33 import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
34 import org.openecomp.dcae.apod.analytics.common.service.processor.MessageProcessor;
35 import org.openecomp.dcae.apod.analytics.common.service.processor.ProcessorContext;
36 import org.openecomp.dcae.apod.analytics.common.utils.PersistenceUtils;
37 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerFunctionalRole;
38 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold;
39 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFJsonProcessor;
40 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyDomainFilter;
41 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyFunctionalRoleFilter;
42 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyThresholdsProcessor;
43 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFProcessorContext;
44 import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
48 import java.util.Date;
49 import java.util.LinkedList;
50 import java.util.List;
52 import javax.annotation.Nullable;
54 import static org.openecomp.dcae.apod.analytics.common.utils.PersistenceUtils.TABLE_ROW_KEY_COLUMN_NAME;
59 * @author Rajiv Singla . Creation Date: 11/15/2016.
61 public abstract class TCAMessageStatusPersister {
63 private static final Logger LOG = LoggerFactory.getLogger(TCAMessageStatusPersister.class);
65 private TCAMessageStatusPersister() {
70 * Saves Message Status in Table. Assumes no alert was generated
72 * @param processorContext processor Context
73 * @param instanceId Instance Id
74 * @param calculatorMessageType Calculation Message Type
75 * @param messageStatusTable Message Status Table
77 public static void persist(final TCACEFProcessorContext processorContext,
79 final TCACalculatorMessageType calculatorMessageType,
80 final ObjectMappedTable<TCAMessageStatusEntity> messageStatusTable) {
81 persist(processorContext, instanceId, calculatorMessageType, messageStatusTable, null);
85 * Saves Message Status in Table. Sets up alert message aslo
87 * @param processorContext processor Context
88 * @param instanceId Instance Id
89 * @param calculatorMessageType Calculation Message Type
90 * @param messageStatusTable Message Status Table
91 * @param alertMessage Alert message
93 public static void persist(final TCACEFProcessorContext processorContext,
95 final TCACalculatorMessageType calculatorMessageType,
96 final ObjectMappedTable<TCAMessageStatusEntity> messageStatusTable,
97 @Nullable final String alertMessage) {
99 final String rowKey = createKey(calculatorMessageType);
101 final Long currentTS = new Date().getTime();
102 final String vesMessage = StringEscapeUtils.unescapeJson(processorContext.getMessage());
104 // Find Functional Role and domain
105 final Pair<String, String> domainAndFunctionalRole = TCAUtils.getDomainAndFunctionalRole(processorContext);
106 final String domain = domainAndFunctionalRole.getLeft();
107 final String functionalRole = domainAndFunctionalRole.getRight();
109 final TCAMessageStatusEntity tcaMessageStatusEntity = new TCAMessageStatusEntity(currentTS,
110 instanceId, calculatorMessageType.name(), vesMessage, domain, functionalRole);
112 // add threshold violation fields
113 addViolatedThreshold(tcaMessageStatusEntity, processorContext);
114 // add processor status and messages
115 addMessageProcessorMessages(tcaMessageStatusEntity, processorContext);
117 tcaMessageStatusEntity.setAlertMessage(
118 alertMessage == null ? null : StringEscapeUtils.unescapeJson(alertMessage)
121 messageStatusTable.write(rowKey, tcaMessageStatusEntity);
123 LOG.debug("Finished persisting VES Status Message with rowKey: {} in Message Status Table.", rowKey);
129 * Create TCA VES Message Status Table Properties
131 * @param timeToLiveSeconds Message Status Table time to live in seconds
133 * @return Message Status table properties
135 public static DatasetProperties getDatasetProperties(final int timeToLiveSeconds) {
138 return ObjectMappedTableProperties.builder()
139 .setType(TCAMessageStatusEntity.class)
140 .setRowKeyExploreName(TABLE_ROW_KEY_COLUMN_NAME)
141 .setRowKeyExploreType(Schema.Type.STRING)
142 .add(IndexedTable.PROPERTY_TTL, timeToLiveSeconds)
143 .setDescription(CDAPComponentsConstants.TCA_FIXED_VES_MESSAGE_STATUS_DESCRIPTION_TABLE)
145 } catch (UnsupportedTypeException e) {
146 final String errorMessage = "Unable to convert TCAMessageStatusEntity class to Schema";
147 throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
154 * Adds Violated Threshold Parameter values to {@link TCAMessageStatusEntity}
156 * @param tcaMessageStatusEntity message entity that needs to be populated with threshold fields
157 * @param processorContext processor context
159 * @return entity with populated threshold field values if present
161 public static TCAMessageStatusEntity addViolatedThreshold(final TCAMessageStatusEntity tcaMessageStatusEntity,
162 final TCACEFProcessorContext processorContext) {
164 final MetricsPerFunctionalRole metricsPerFunctionalRole = processorContext.getMetricsPerFunctionalRole();
166 if (metricsPerFunctionalRole != null
167 && metricsPerFunctionalRole.getThresholds() != null
168 && metricsPerFunctionalRole.getThresholds().get(0) != null) {
170 final Threshold threshold = metricsPerFunctionalRole.getThresholds().get(0);
171 tcaMessageStatusEntity.setThresholdPath(threshold.getFieldPath());
172 tcaMessageStatusEntity.setThresholdSeverity(threshold.getSeverity().name());
173 tcaMessageStatusEntity.setThresholdDirection(threshold.getDirection().name());
174 tcaMessageStatusEntity.setThresholdValue(threshold.getThresholdValue());
177 return tcaMessageStatusEntity;
182 * Add TCA CEF Message Processor status information
184 * @param tcaMessageStatusEntity message entity that needs to be populated with message processor fields
185 * @param processorContext processor context
187 * @return entity with populated message process status information
189 public static TCAMessageStatusEntity addMessageProcessorMessages(
190 final TCAMessageStatusEntity tcaMessageStatusEntity, final TCACEFProcessorContext processorContext) {
191 final List<? super MessageProcessor<? extends ProcessorContext>> messageProcessors = processorContext
192 .getMessageProcessors();
194 if (messageProcessors != null && !messageProcessors.isEmpty()) {
195 for (Object messageProcessor : messageProcessors) {
196 final MessageProcessor<TCACEFProcessorContext> tcaMessageProcessor =
197 (MessageProcessor<TCACEFProcessorContext>) messageProcessor;
199 final String processingState = tcaMessageProcessor.getProcessingState().name();
200 final String processingMessage = tcaMessageProcessor.getProcessingMessage().orNull();
202 if (messageProcessor.getClass().equals(TCACEFJsonProcessor.class)) {
203 tcaMessageStatusEntity.setJsonProcessorStatus(processingState);
204 tcaMessageStatusEntity.setJsonProcessorMessage(processingMessage);
207 if (messageProcessor.getClass().equals(TCACEFPolicyDomainFilter.class)) {
208 tcaMessageStatusEntity.setDomainFilterStatus(processingState);
209 tcaMessageStatusEntity.setDomainFilterMessage(processingMessage);
212 if (messageProcessor.getClass().equals(TCACEFPolicyFunctionalRoleFilter.class)) {
213 tcaMessageStatusEntity.setFunctionalRoleFilterStatus(processingState);
214 tcaMessageStatusEntity.setFunctionalRoleFilterMessage(processingMessage);
217 if (messageProcessor.getClass().equals(TCACEFPolicyThresholdsProcessor.class)) {
218 tcaMessageStatusEntity.setThresholdCalculatorStatus(processingState);
219 tcaMessageStatusEntity.setThresholdCalculatorMessage(processingMessage);
224 return tcaMessageStatusEntity;
228 * Creates Row Key for TCA VES Message Status table
230 * Row Key = (Message Type + Decreasing Value)
232 * @param calculatorMessageType calculator message type
234 * @return row key string
236 public static String createKey(final TCACalculatorMessageType calculatorMessageType) {
238 final List<String> keyList = new LinkedList<>();
239 keyList.add(calculatorMessageType.name());
240 keyList.add(PersistenceUtils.getCurrentTimeReverseSubKey());
241 return Joiner.on(PersistenceUtils.ROW_KEY_DELIMITER).join(keyList);