Sonar Critical Fix
[dcaegen2/analytics/tca.git] / dcae-analytics-common / src / main / java / org / openecomp / dcae / apod / analytics / common / utils / MessageProcessorUtils.java
1 /*\r
2  * ===============================LICENSE_START======================================\r
3  *  dcae-analytics\r
4  * ================================================================================\r
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
6  * ================================================================================\r
7  *  Licensed under the Apache License, Version 2.0 (the "License");\r
8  *  you may not use this file except in compliance with the License.\r
9  *   You may obtain a copy of the License at\r
10  *\r
11  *          http://www.apache.org/licenses/LICENSE-2.0\r
12  *\r
13  *  Unless required by applicable law or agreed to in writing, software\r
14  *  distributed under the License is distributed on an "AS IS" BASIS,\r
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  *  See the License for the specific language governing permissions and\r
17  *  limitations under the License.\r
18  *  ============================LICENSE_END===========================================\r
19  */\r
20 \r
21 package org.openecomp.dcae.apod.analytics.common.utils;\r
22 \r
23 import com.google.common.base.Preconditions;\r
24 import org.openecomp.dcae.apod.analytics.common.exception.MessageProcessingException;\r
25 import org.openecomp.dcae.apod.analytics.common.service.filter.GenericJsonMessageFilter;\r
26 import org.openecomp.dcae.apod.analytics.common.service.filter.JsonMessageFilterProcessorContext;\r
27 import org.openecomp.dcae.apod.analytics.common.service.processor.GenericMessageChainProcessor;\r
28 import org.openecomp.dcae.apod.analytics.common.service.processor.MessageProcessor;\r
29 import org.openecomp.dcae.apod.analytics.common.service.processor.ProcessorContext;\r
30 import org.slf4j.Logger;\r
31 import org.slf4j.LoggerFactory;\r
32 \r
33 import java.util.Iterator;\r
34 import java.util.LinkedList;\r
35 import java.util.List;\r
36 import java.util.Map;\r
37 import java.util.Set;\r
38 \r
39 import javax.annotation.Nonnull;\r
40 \r
41 \r
42 /**\r
43  *\r
44  * @author Rajiv Singla . Creation Date: 11/8/2016.\r
45  */\r
46 public abstract class MessageProcessorUtils {\r
47 \r
48     private static final Logger LOG = LoggerFactory.getLogger(MessageProcessorUtils.class);\r
49 \r
50     /**\r
51      * Provides an abstraction how to apply {@link ProcessorContext} to next {@link MessageProcessor}\r
52      * in the message processor chain\r
53      *\r
54      * @param <P> Sub classes of Processor Context\r
55      */\r
56     public interface MessageProcessorFunction<P extends ProcessorContext> {\r
57 \r
58         /**\r
59          * Method which provides accumulated {@link ProcessorContext} from previous processors and a reference\r
60          * to next processor in the chain\r
61          *\r
62          * @param p accumulated {@link ProcessorContext} from previous processors\r
63          * @param m current {@link MessageProcessor} in the chain\r
64          * @param <M> Message processor sub classes\r
65          *\r
66          * @return processing context after computing the current Message Processor\r
67          */\r
68         <M extends MessageProcessor<P>> P apply(P p, M m);\r
69     }\r
70 \r
71 \r
72     /**\r
73      * Provides an abstraction to compute a chain of {@link MessageProcessor}\r
74      *\r
75      * @param messageProcessors An iterable containing one or more {@link MessageProcessor}s\r
76      * @param initialProcessorContext An initial processing Context\r
77      * @param messageProcessorFunction messageProcessor Function\r
78      * @param <P> Sub classes for Processor Context\r
79      *\r
80      * @return processing context which results after computing the whole chain\r
81      */\r
82     public static <P extends ProcessorContext> P computeMessageProcessorChain(\r
83             final Iterable<? extends MessageProcessor<P>> messageProcessors,\r
84             final P initialProcessorContext,\r
85             final MessageProcessorFunction<P> messageProcessorFunction) {\r
86 \r
87         // Get message processor iterator\r
88         final Iterator<? extends MessageProcessor<P>> processorIterator = messageProcessors.iterator();\r
89 \r
90         // If no next message processor - return initial processor context\r
91         if (!processorIterator.hasNext()) {\r
92             return initialProcessorContext;\r
93         }\r
94 \r
95         // An accumulator for processor Context\r
96         P processorContextAccumulator = initialProcessorContext;\r
97 \r
98         while (processorIterator.hasNext()) {\r
99 \r
100             final MessageProcessor<P> nextProcessor = processorIterator.next();\r
101 \r
102             // If Initial Processor Context is null\r
103             if (processorContextAccumulator == null) {\r
104                 final String errorMessage =\r
105                         String.format("Processor Context must not be null for Message Process: %s",\r
106                                 nextProcessor.getProcessorInfo().getProcessorName());\r
107                 throw new MessageProcessingException(errorMessage, LOG, new IllegalStateException(errorMessage));\r
108             }\r
109 \r
110 \r
111             if (!processorContextAccumulator.canProcessingContinue()) {\r
112                 LOG.debug("Triggering Early Termination, before Message Processor: {}, Incoming Message: {}",\r
113                         nextProcessor.getProcessorInfo().getProcessorName(), processorContextAccumulator.getMessage());\r
114                 break;\r
115             }\r
116             processorContextAccumulator = messageProcessorFunction.apply(processorContextAccumulator, nextProcessor);\r
117         }\r
118 \r
119         return processorContextAccumulator;\r
120     }\r
121 \r
122 \r
123     /**\r
124      * Utility method to process Json Filter Mappings. Processes incoming json message and applies a list of json\r
125      * filter mappings and returns the resulting {@link JsonMessageFilterProcessorContext}\r
126      *\r
127      * @param jsonMessage json message to which filter mappings will be applies\r
128      * @param jsonFilterMappings Filter mappings contains a Map containing keys as filter json path\r
129      * and values as set of expected value corresponding to filter path\r
130      *\r
131      * @return json message processor context which contains the {@link JsonMessageFilterProcessorContext#isMatched}\r
132      * status after applying all filter mappings\r
133      */\r
134     public static JsonMessageFilterProcessorContext processJsonFilterMappings(\r
135             final String jsonMessage, @Nonnull final Map<String, Set<String>> jsonFilterMappings) {\r
136 \r
137         Preconditions.checkState(jsonFilterMappings.size() > 0, "Json Filter Mappings must not be empty");\r
138 \r
139         // create initial processor context containing the json message that need to be processed\r
140         final JsonMessageFilterProcessorContext initialProcessorContext =\r
141                 new JsonMessageFilterProcessorContext(jsonMessage);\r
142 \r
143         // Create Json Message Filters\r
144         final List<GenericJsonMessageFilter> jsonMessageFilters = new LinkedList<>();\r
145 \r
146         int i = 0;\r
147         for (Map.Entry<String, Set<String>> jsonFilterMapping : jsonFilterMappings.entrySet()) {\r
148             jsonMessageFilters.add(new GenericJsonMessageFilter("Filter-" + i, jsonFilterMapping.getKey(),\r
149                     jsonFilterMapping.getValue()));\r
150             i++;\r
151         }\r
152 \r
153         // Create Generic Message Chain Processor\r
154         final GenericMessageChainProcessor<JsonMessageFilterProcessorContext> messageChainProcessor =\r
155                 new GenericMessageChainProcessor<>(jsonMessageFilters, initialProcessorContext);\r
156 \r
157         // Process chain and return resulting json Message Filter Processor Context\r
158         return messageChainProcessor.processChain();\r
159     }\r
160 \r
161 \r
162 }\r