Changes for checkstyle 8.32
[policy/apex-pdp.git] / services / services-engine / src / main / java / org / onap / policy / apex / service / engine / event / impl / filecarrierplugin / consumer / HeaderDelimitedTextBlockReader.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  * 
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  * 
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * 
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer;
22
23 import java.io.BufferedReader;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.io.InputStreamReader;
27 import java.util.Queue;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
30 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
31 import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolTextTokenDelimitedParameters;
32 import org.slf4j.ext.XLogger;
33 import org.slf4j.ext.XLoggerFactory;
34
35 /**
36  * The Class TextBlockReader reads the next block of text from an input stream.
37  *
38  * @author Liam Fallon (liam.fallon@ericsson.com)
39  */
40 public class HeaderDelimitedTextBlockReader implements TextBlockReader, Runnable {
41     // The logger for this class
42     private static final XLogger LOGGER = XLoggerFactory.getXLogger(HeaderDelimitedTextBlockReader.class);
43
44     // The amount of time to wait for input on the text block reader
45     private static final long TEXT_BLOCK_DELAY = 250;
46
47     // Tag for the start and end of text blocks
48     private final String blockStartToken;
49     private final String blockEndToken;
50
51     // Indicates that text block processing starts at the first block of text
52     private final boolean delimiterAtStart;
53     private boolean blockEndTokenUsed = false;
54
55     // The thread used to read the text from the stream
56     private Thread textConsumputionThread;
57
58     // The input stream for text
59     private InputStream inputStream;
60
61     // The lines of input read from the input stream
62     private final Queue<String> textLineQueue = new LinkedBlockingQueue<>();
63
64     // True while EOF has not been seen on input
65     private boolean eofOnInputStream = false;
66
67     /**
68      * Constructor, initialize the text block reader using token delimited event protocol parameters.
69      *
70      * @param tokenDelimitedParameters
71      *        the token delimited event protocol parameters
72      */
73     public HeaderDelimitedTextBlockReader(final EventProtocolTextTokenDelimitedParameters tokenDelimitedParameters) {
74         this(tokenDelimitedParameters.getStartDelimiterToken(), tokenDelimitedParameters.getEndDelimiterToken(),
75                         tokenDelimitedParameters.isDelimiterAtStart());
76     }
77
78     /**
79      * Constructor, initialize the text block reader.
80      *
81      * @param blockStartToken
82      *        the block start token for the start of a text block
83      * @param blockEndToken
84      *        the block end token for the end of a text block
85      * @param delimiterAtStart
86      *        indicates that text block processing starts at the first block of text
87      */
88     public HeaderDelimitedTextBlockReader(final String blockStartToken, final String blockEndToken,
89                     final boolean delimiterAtStart) {
90         this.blockStartToken = blockStartToken;
91         this.delimiterAtStart = delimiterAtStart;
92
93         if (blockEndToken == null) {
94             this.blockEndToken = blockStartToken;
95             this.blockEndTokenUsed = false;
96         } else {
97             this.blockEndToken = blockEndToken;
98             this.blockEndTokenUsed = true;
99         }
100     }
101
102     /**
103      * {@inheritDoc}.
104      */
105     @Override
106     public void init(final InputStream incomingInputStream) {
107         this.inputStream = incomingInputStream;
108
109         // Configure and start the text reading thread
110         textConsumputionThread = new ApplicationThreadFactory(this.getClass().getName()).newThread(this);
111         textConsumputionThread.setDaemon(true);
112         textConsumputionThread.start();
113     }
114
115     /**
116      * {@inheritDoc}.
117      */
118     @Override
119     public TextBlock readTextBlock() throws IOException {
120         // Holder for the current text block
121         final StringBuilder textBlockBuilder = new StringBuilder();
122
123         // Wait for the timeout period if there is no input
124         if (!eofOnInputStream && textLineQueue.isEmpty()) {
125             ThreadUtilities.sleep(TEXT_BLOCK_DELAY);
126         }
127
128         // Scan the lines in the queue
129         while (!textLineQueue.isEmpty()) {
130             // Scroll down in the available lines looking for the start of the text block
131             if (!delimiterAtStart || textLineQueue.peek().startsWith(blockStartToken)) {
132                 // Process the input line header
133                 textBlockBuilder.append(textLineQueue.remove());
134                 textBlockBuilder.append('\n');
135                 break;
136             } else {
137                 String consumer = textLineQueue.remove();
138                 LOGGER.warn("invalid input on consumer: {}", consumer);
139             }
140         }
141
142         // Get the rest of the text document
143         while (!textLineQueue.isEmpty() && !textLineQueue.peek().startsWith(blockEndToken)
144                         && !textLineQueue.peek().startsWith(blockStartToken)) {
145             // We just strip out block end tokens because we use block start tokens to delimit the blocks of text
146             textBlockBuilder.append(textLineQueue.remove());
147             textBlockBuilder.append('\n');
148         }
149
150         // Check if we should add the block end token to the end of the text block
151         if (!textLineQueue.isEmpty() && blockEndTokenUsed && textLineQueue.peek().startsWith(blockEndToken)) {
152             // Process the input line header
153             textBlockBuilder.append(textLineQueue.remove());
154             textBlockBuilder.append('\n');
155         }
156
157         // Condition the text block and return it
158         final String textBlock = textBlockBuilder.toString().trim();
159         final boolean endOfText = eofOnInputStream && textLineQueue.isEmpty();
160
161         if (textBlock.length() > 0) {
162             return new TextBlock(endOfText, textBlock);
163         } else {
164             return new TextBlock(endOfText, null);
165         }
166     }
167
168     /**
169      * {@inheritDoc}.
170      */
171     @Override
172     public void run() {
173         final BufferedReader textReader = new BufferedReader(new InputStreamReader(inputStream));
174
175         try {
176             // Read the input line by line until we see end of file on the stream
177             String line;
178             while ((line = textReader.readLine()) != null) {
179                 textLineQueue.add(line);
180             }
181         } catch (final IOException e) {
182             LOGGER.warn("I/O exception on text input on consumer: ", e);
183         } finally {
184             eofOnInputStream = true;
185         }
186     }
187 }