2 * ===============================LICENSE_START======================================
\r
4 * ================================================================================
\r
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
\r
6 * ================================================================================
\r
7 * Licensed under the Apache License, Version 2.0 (the "License");
\r
8 * you may not use this file except in compliance with the License.
\r
9 * You may obtain a copy of the License at
\r
11 * http://www.apache.org/licenses/LICENSE-2.0
\r
13 * Unless required by applicable law or agreed to in writing, software
\r
14 * distributed under the License is distributed on an "AS IS" BASIS,
\r
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
16 * See the License for the specific language governing permissions and
\r
17 * limitations under the License.
\r
18 * ============================LICENSE_END===========================================
\r
21 package org.openecomp.dcae.apod.analytics.common.utils;
\r
23 import com.google.common.base.Preconditions;
\r
24 import org.openecomp.dcae.apod.analytics.common.exception.MessageProcessingException;
\r
25 import org.openecomp.dcae.apod.analytics.common.service.filter.GenericJsonMessageFilter;
\r
26 import org.openecomp.dcae.apod.analytics.common.service.filter.JsonMessageFilterProcessorContext;
\r
27 import org.openecomp.dcae.apod.analytics.common.service.processor.GenericMessageChainProcessor;
\r
28 import org.openecomp.dcae.apod.analytics.common.service.processor.MessageProcessor;
\r
29 import org.openecomp.dcae.apod.analytics.common.service.processor.ProcessorContext;
\r
30 import org.slf4j.Logger;
\r
31 import org.slf4j.LoggerFactory;
\r
33 import java.util.Iterator;
\r
34 import java.util.LinkedList;
\r
35 import java.util.List;
\r
36 import java.util.Map;
\r
37 import java.util.Set;
\r
39 import javax.annotation.Nonnull;
\r
44 * @author Rajiv Singla . Creation Date: 11/8/2016.
\r
46 public abstract class MessageProcessorUtils {
\r
48 private static final Logger LOG = LoggerFactory.getLogger(MessageProcessorUtils.class);
\r
51 * Provides an abstraction how to apply {@link ProcessorContext} to next {@link MessageProcessor}
\r
52 * in the message processor chain
\r
54 * @param <P> Sub classes of Processor Context
\r
56 public interface MessageProcessorFunction<P extends ProcessorContext> {
\r
59 * Method which provides accumulated {@link ProcessorContext} from previous processors and a reference
\r
60 * to next processor in the chain
\r
62 * @param p accumulated {@link ProcessorContext} from previous processors
\r
63 * @param m current {@link MessageProcessor} in the chain
\r
64 * @param <M> Message processor sub classes
\r
66 * @return processing context after computing the current Message Processor
\r
68 <M extends MessageProcessor<P>> P apply(P p, M m);
\r
73 * Provides an abstraction to compute a chain of {@link MessageProcessor}
\r
75 * @param messageProcessors An iterable containing one or more {@link MessageProcessor}s
\r
76 * @param initialProcessorContext An initial processing Context
\r
77 * @param messageProcessorFunction messageProcessor Function
\r
78 * @param <P> Sub classes for Processor Context
\r
80 * @return processing context which results after computing the whole chain
\r
82 public static <P extends ProcessorContext> P computeMessageProcessorChain(
\r
83 final Iterable<? extends MessageProcessor<P>> messageProcessors,
\r
84 final P initialProcessorContext,
\r
85 final MessageProcessorFunction<P> messageProcessorFunction) {
\r
87 // Get message processor iterator
\r
88 final Iterator<? extends MessageProcessor<P>> processorIterator = messageProcessors.iterator();
\r
90 // If no next message processor - return initial processor context
\r
91 if (!processorIterator.hasNext()) {
\r
92 return initialProcessorContext;
\r
95 // An accumulator for processor Context
\r
96 P processorContextAccumulator = initialProcessorContext;
\r
98 while (processorIterator.hasNext()) {
\r
100 final MessageProcessor<P> nextProcessor = processorIterator.next();
\r
102 // If Initial Processor Context is null
\r
103 if (processorContextAccumulator == null) {
\r
104 final String errorMessage =
\r
105 String.format("Processor Context must not be null for Message Process: %s",
\r
106 nextProcessor.getProcessorInfo().getProcessorName());
\r
107 throw new MessageProcessingException(errorMessage, LOG, new IllegalStateException(errorMessage));
\r
111 if (!processorContextAccumulator.canProcessingContinue()) {
\r
112 LOG.debug("Triggering Early Termination, before Message Processor: {}, Incoming Message: {}",
\r
113 nextProcessor.getProcessorInfo().getProcessorName(), processorContextAccumulator.getMessage());
\r
116 processorContextAccumulator = messageProcessorFunction.apply(processorContextAccumulator, nextProcessor);
\r
119 return processorContextAccumulator;
\r
124 * Utility method to process Json Filter Mappings. Processes incoming json message and applies a list of json
\r
125 * filter mappings and returns the resulting {@link JsonMessageFilterProcessorContext}
\r
127 * @param jsonMessage json message to which filter mappings will be applies
\r
128 * @param jsonFilterMappings Filter mappings contains a Map containing keys as filter json path
\r
129 * and values as set of expected value corresponding to filter path
\r
131 * @return json message processor context which contains the {@link JsonMessageFilterProcessorContext#isMatched}
\r
132 * status after applying all filter mappings
\r
134 public static JsonMessageFilterProcessorContext processJsonFilterMappings(
\r
135 final String jsonMessage, @Nonnull final Map<String, Set<String>> jsonFilterMappings) {
\r
137 Preconditions.checkState(jsonFilterMappings.size() > 0, "Json Filter Mappings must not be empty");
\r
139 // create initial processor context containing the json message that need to be processed
\r
140 final JsonMessageFilterProcessorContext initialProcessorContext =
\r
141 new JsonMessageFilterProcessorContext(jsonMessage);
\r
143 // Create Json Message Filters
\r
144 final List<GenericJsonMessageFilter> jsonMessageFilters = new LinkedList<>();
\r
147 for (Map.Entry<String, Set<String>> jsonFilterMapping : jsonFilterMappings.entrySet()) {
\r
148 jsonMessageFilters.add(new GenericJsonMessageFilter("Filter-" + i, jsonFilterMapping.getKey(),
\r
149 jsonFilterMapping.getValue()));
\r
153 // Create Generic Message Chain Processor
\r
154 final GenericMessageChainProcessor<JsonMessageFilterProcessorContext> messageChainProcessor =
\r
155 new GenericMessageChainProcessor<>(jsonMessageFilters, initialProcessorContext);
\r
157 // Process chain and return resulting json Message Filter Processor Context
\r
158 return messageChainProcessor.processChain();
\r