package org.openecomp.dcae.apod.analytics.cdap.tca.flowlet;
+import co.cask.cdap.api.annotation.HashPartition;
import co.cask.cdap.api.annotation.Output;
import co.cask.cdap.api.annotation.ProcessInput;
import co.cask.cdap.api.annotation.Property;
-import co.cask.cdap.api.annotation.RoundRobin;
import co.cask.cdap.api.dataset.lib.ObjectMappedTable;
import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;
import co.cask.cdap.api.flow.flowlet.FlowletContext;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
import org.openecomp.dcae.apod.analytics.cdap.common.CDAPMetricsConstants;
+import org.openecomp.dcae.apod.analytics.cdap.common.domain.tca.ThresholdCalculatorOutput;
import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCACalculatorMessageType;
import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAMessageStatusEntity;
import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences;
import org.openecomp.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils;
+import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy;
import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFProcessorContext;
import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils;
private static final Logger LOG = LoggerFactory.getLogger(TCAVESThresholdViolationCalculatorFlowlet.class);
@Output(CDAPComponentsConstants.TCA_FIXED_VES_TCA_CALCULATOR_NAME_OUTPUT)
- protected OutputEmitter<String> tcaAlertOutputEmitter;
+ protected OutputEmitter<ThresholdCalculatorOutput> tcaAlertOutputEmitter;
protected Metrics metrics;
private ObjectMappedTable<TCAMessageStatusEntity> vesMessageStatusTable;
* @throws JsonProcessingException if alert message cannot be parsed into JSON object
*/
@ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_MESSAGE_ROUTER_OUTPUT)
- @RoundRobin
+ @HashPartition(AnalyticsConstants.TCA_VES_MESSAGE_ROUTER_PARTITION_KEY)
public void filterVESMessages(String vesMessage) throws JsonProcessingException {
TCACalculatorMessageType calculatorMessageType = TCACalculatorMessageType.INAPPLICABLE;
metrics.count(CDAPMetricsConstants.TCA_VES_NON_COMPLIANT_MESSAGES_METRIC, 1);
// Step 4: Emit message to Alert Sink Flowlet
- tcaAlertOutputEmitter.emit(alertMessage);
+ final ThresholdCalculatorOutput thresholdCalculatorOutput =
+ new ThresholdCalculatorOutput(processorContext.getMessage(),
+ TCAUtils.writeValueAsString(processorContext.getTCAPolicy()),
+ TCAUtils.writeValueAsString(processorContextWithViolations.getMetricsPerEventName()),
+ alertMessage);
+ tcaAlertOutputEmitter.emit(thresholdCalculatorOutput);
} else {