Initial TCA commit into DCAEGEN2
[dcaegen2/analytics/tca.git] / dcae-analytics-common / src / main / java / org / openecomp / dcae / apod / analytics / common / service / processor / AbstractMessageProcessor.java
1 /*
2  * ===============================LICENSE_START======================================
3  *  dcae-analytics
4  * ================================================================================
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *   You may obtain a copy of the License at
10  *
11  *          http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  *  ============================LICENSE_END===========================================
19  */
20
21 package org.openecomp.dcae.apod.analytics.common.service.processor;
22
23 import com.google.common.base.Optional;
24 import org.openecomp.dcae.apod.analytics.common.exception.MessageProcessingException;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 import javax.annotation.Nonnull;
29
30 import static java.lang.String.format;
31
32 /**
33  * An abstract Message Processor which can be extended by {@link MessageProcessor} implementations
34  * to get default behavior for Message Processors
35  *
36  * @param <P> Processor Context sub classes
37  *
38  * @author Rajiv Singla . Creation Date: 11/8/2016.
39  */
40 public abstract class AbstractMessageProcessor<P extends ProcessorContext> implements MessageProcessor<P> {
41
42     private static final Logger LOG = LoggerFactory.getLogger(AbstractMessageProcessor.class);
43
44     /**
45      * By Default there is no processing message
46      */
47     private String processingMessage = null;
48
49     /**
50      * By Default Processing State is set to not required - subclasses must
51      * set processing state to {@link ProcessingState#PROCESSING_FINISHED_SUCCESSFULLY} on successful processing
52      * or {@link ProcessingState#PROCESSING_TERMINATED_EARLY} if processing fails
53      */
54     protected ProcessingState processingState = ProcessingState.PROCESSING_NOT_REQUIRED;
55
56     /**
57      * Sub classes must provide a description of a processor
58      *
59      * @return description of processor
60      *
61      */
62     public abstract String getProcessorDescription();
63
64
65     /**
66      * Sub classes must provide implementation to process Message
67      *
68      * @param processorContext incoming {@link ProcessorContext}
69      * @return outgoing {@link ProcessorContext}
70      */
71     public abstract P processMessage(P processorContext);
72
73     @Override
74     public ProcessorInfo getProcessorInfo() {
75         // by default the class of the Processor is assigned as Processor Name
76         final String processorClassName = getClass().getSimpleName();
77         return new GenericProcessorInfo(processorClassName, getProcessorDescription());
78     }
79
80     @Override
81     public P preProcessor(P processorContext) {
82         LOG.debug("Processing Started for Processor: {}", getProcessorInfo().getProcessorName());
83         // by default check to see if continue processing Flag is not false
84         final boolean okToContinue = processorContext.canProcessingContinue();
85         if (!okToContinue) {
86             final String errorMessage =
87                     format("Processor: %s. Processing Context flag okToContinue is false. Unable to proceed...",
88                             getProcessorInfo().getProcessorName());
89             throw new MessageProcessingException(errorMessage, LOG, new IllegalStateException(errorMessage));
90         }
91         processingState = ProcessingState.PROCESSING_STARTED;
92         return processorContext;
93     }
94
95     @Override
96     public ProcessingState getProcessingState() {
97         return processingState;
98     }
99
100     @Override
101     public Optional<String> getProcessingMessage() {
102         return Optional.fromNullable(processingMessage);
103     }
104
105     @Override
106     public P postProcessor(P processorContext) {
107         // Default implementation updates the post processing flag if processing did not
108         // completed successfully
109         if (processingState != ProcessingState.PROCESSING_FINISHED_SUCCESSFULLY) {
110             LOG.debug("Processor: {}, Update Process Context State to stop Processing.",
111                     getProcessorInfo().getProcessorName());
112             processorContext.setProcessingContinueFlag(false);
113         }
114         // attaches itself to message processor context
115         processorContext.getMessageProcessors().add(this);
116         LOG.debug("Processing Completed for Processor: {}", getProcessorInfo());
117         return processorContext;
118     }
119
120
121     @Override
122     public final P apply(@Nonnull P processorContext) {
123         final P preProcessedProcessorContext = preProcessor(processorContext);
124         final P processedProcessorContext = processMessage(preProcessedProcessorContext);
125         return postProcessor(processedProcessorContext);
126     }
127
128
129     /**
130      * Helper method that updates processing state in case of early termination, logs the processing
131      * termination reason, updates Processor processing state as Terminated and sets it processing message
132      *
133      * @param terminatingMessage error Message
134      * @param processorContext message processor context
135      */
136     protected void setTerminatingProcessingMessage(final String terminatingMessage,
137                                                    final P processorContext) {
138
139         final String message = processorContext.getMessage();
140         this.processingState = ProcessingState.PROCESSING_TERMINATED_EARLY;
141         this.processingMessage = terminatingMessage;
142         LOG.debug("Processor: {}, Early Terminating Message: {}, Incoming Message: {}",
143                 getProcessorInfo().getProcessorName(), terminatingMessage, message);
144     }
145
146     /**
147      * Helper method that updates Processing state and logs completion message
148      * passed
149      *
150      * @param processorPassingMessage Processor passing message
151      * @param processorContext message processor context
152      */
153     protected void setFinishedProcessingMessage(final String processorPassingMessage, P processorContext) {
154         final String message = processorContext.getMessage();
155         processingState = ProcessingState.PROCESSING_FINISHED_SUCCESSFULLY;
156         this.processingMessage = processorPassingMessage;
157         LOG.debug("Processor: {}, Successful Completion Message: {}, Incoming Message: {}",
158                 getProcessorInfo().getProcessorName(), processorPassingMessage, message);
159     }
160
161
162 }