70b1e31c6c3f42d483517379a4fc7fd3d2a7af95
[dcaegen2/analytics/tca.git] / dcae-analytics-common / src / main / java / org / openecomp / dcae / apod / analytics / common / service / processor / AbstractMessageProcessor.java
1 /*\r
2  * ===============================LICENSE_START======================================\r
3  *  dcae-analytics\r
4  * ================================================================================\r
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
6  * ================================================================================\r
7  *  Licensed under the Apache License, Version 2.0 (the "License");\r
8  *  you may not use this file except in compliance with the License.\r
9  *   You may obtain a copy of the License at\r
10  *\r
11  *          http://www.apache.org/licenses/LICENSE-2.0\r
12  *\r
13  *  Unless required by applicable law or agreed to in writing, software\r
14  *  distributed under the License is distributed on an "AS IS" BASIS,\r
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  *  See the License for the specific language governing permissions and\r
17  *  limitations under the License.\r
18  *  ============================LICENSE_END===========================================\r
19  */\r
20 \r
21 package org.openecomp.dcae.apod.analytics.common.service.processor;\r
22 \r
23 import com.google.common.base.Optional;\r
24 import org.openecomp.dcae.apod.analytics.common.exception.MessageProcessingException;\r
25 import org.slf4j.Logger;\r
26 import org.slf4j.LoggerFactory;\r
27 \r
28 import javax.annotation.Nonnull;\r
29 \r
30 import static java.lang.String.format;\r
31 \r
32 /**\r
33  * An abstract Message Processor which can be extended by {@link MessageProcessor} implementations\r
34  * to get default behavior for Message Processors\r
35  *\r
36  * @param <P> Processor Context sub classes\r
37  *\r
38  * @author Rajiv Singla . Creation Date: 11/8/2016.\r
39  */\r
40 public abstract class AbstractMessageProcessor<P extends ProcessorContext> implements MessageProcessor<P> {\r
41 \r
42     private static final Logger LOG = LoggerFactory.getLogger(AbstractMessageProcessor.class);\r
43 \r
44     /**\r
45      * By Default there is no processing message\r
46      */\r
47     private String processingMessage = null;\r
48 \r
49     /**\r
50      * By Default Processing State is set to not required - subclasses must\r
51      * set processing state to {@link ProcessingState#PROCESSING_FINISHED_SUCCESSFULLY} on successful processing\r
52      * or {@link ProcessingState#PROCESSING_TERMINATED_EARLY} if processing fails\r
53      */\r
54     protected ProcessingState processingState = ProcessingState.PROCESSING_NOT_REQUIRED;\r
55 \r
56     /**\r
57      * Sub classes must provide a description of a processor\r
58      *\r
59      * @return description of processor\r
60      *\r
61      */\r
62     public abstract String getProcessorDescription();\r
63 \r
64 \r
65     /**\r
66      * Sub classes must provide implementation to process Message\r
67      *\r
68      * @param processorContext incoming {@link ProcessorContext}\r
69      * @return outgoing {@link ProcessorContext}\r
70      */\r
71     public abstract P processMessage(P processorContext);\r
72 \r
73     @Override\r
74     public ProcessorInfo getProcessorInfo() {\r
75         // by default the class of the Processor is assigned as Processor Name\r
76         final String processorClassName = getClass().getSimpleName();\r
77         return new GenericProcessorInfo(processorClassName, getProcessorDescription());\r
78     }\r
79 \r
80     @Override\r
81     public P preProcessor(P processorContext) {\r
82         LOG.debug("Processing Started for Processor: {}", getProcessorInfo().getProcessorName());\r
83         // by default check to see if continue processing Flag is not false\r
84         final boolean okToContinue = processorContext.canProcessingContinue();\r
85         if (!okToContinue) {\r
86             final String errorMessage =\r
87                     format("Processor: %s. Processing Context flag okToContinue is false. Unable to proceed...",\r
88                             getProcessorInfo().getProcessorName());\r
89             throw new MessageProcessingException(errorMessage, LOG, new IllegalStateException(errorMessage));\r
90         }\r
91         processingState = ProcessingState.PROCESSING_STARTED;\r
92         return processorContext;\r
93     }\r
94 \r
95     @Override\r
96     public ProcessingState getProcessingState() {\r
97         return processingState;\r
98     }\r
99 \r
100     @Override\r
101     public Optional<String> getProcessingMessage() {\r
102         return Optional.fromNullable(processingMessage);\r
103     }\r
104 \r
105     @Override\r
106     public P postProcessor(P processorContext) {\r
107         // Default implementation updates the post processing flag if processing did not\r
108         // completed successfully\r
109         if (processingState != ProcessingState.PROCESSING_FINISHED_SUCCESSFULLY) {\r
110             LOG.debug("Processor: {}, Update Process Context State to stop Processing.",\r
111                     getProcessorInfo().getProcessorName());\r
112             processorContext.setProcessingContinueFlag(false);\r
113         }\r
114         // attaches itself to message processor context\r
115         processorContext.getMessageProcessors().add(this);\r
116         LOG.debug("Processing Completed for Processor: {}", getProcessorInfo());\r
117         return processorContext;\r
118     }\r
119 \r
120 \r
121     @Override\r
122     public final P apply(@Nonnull P processorContext) {\r
123         final P preProcessedProcessorContext = preProcessor(processorContext);\r
124         final P processedProcessorContext = processMessage(preProcessedProcessorContext);\r
125         return postProcessor(processedProcessorContext);\r
126     }\r
127 \r
128 \r
129     /**\r
130      * Helper method that updates processing state in case of early termination, logs the processing\r
131      * termination reason, updates Processor processing state as Terminated and sets it processing message\r
132      *\r
133      * @param terminatingMessage error Message\r
134      * @param processorContext message processor context\r
135      */\r
136     protected void setTerminatingProcessingMessage(final String terminatingMessage,\r
137                                                    final P processorContext) {\r
138 \r
139         final String message = processorContext.getMessage();\r
140         this.processingState = ProcessingState.PROCESSING_TERMINATED_EARLY;\r
141         this.processingMessage = terminatingMessage;\r
142         LOG.debug("Processor: {}, Early Terminating Message: {}, Incoming Message: {}",\r
143                 getProcessorInfo().getProcessorName(), terminatingMessage, message);\r
144     }\r
145 \r
146     /**\r
147      * Helper method that updates Processing state and logs completion message\r
148      * passed\r
149      *\r
150      * @param processorPassingMessage Processor passing message\r
151      * @param processorContext message processor context\r
152      */\r
153     protected void setFinishedProcessingMessage(final String processorPassingMessage, P processorContext) {\r
154         final String message = processorContext.getMessage();\r
155         processingState = ProcessingState.PROCESSING_FINISHED_SUCCESSFULLY;\r
156         this.processingMessage = processorPassingMessage;\r
157         LOG.debug("Processor: {}, Successful Completion Message: {}, Incoming Message: {}",\r
158                 getProcessorInfo().getProcessorName(), processorPassingMessage, message);\r
159     }\r
160 \r
161 \r
162 }\r