2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer;
23 import java.io.BufferedReader;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.io.InputStreamReader;
27 import java.util.Queue;
28 import java.util.concurrent.LinkedBlockingQueue;
30 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
31 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
32 import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolTextTokenDelimitedParameters;
33 import org.slf4j.ext.XLogger;
34 import org.slf4j.ext.XLoggerFactory;
37 * The Class TextBlockReader reads the next block of text from an input stream.
39 * @author Liam Fallon (liam.fallon@ericsson.com)
41 public class HeaderDelimitedTextBlockReader implements TextBlockReader, Runnable {
42 // The logger for this class
43 private static final XLogger LOGGER = XLoggerFactory.getXLogger(HeaderDelimitedTextBlockReader.class);
45 // The amount of time to wait for input on the text block reader
46 private static final long TEXT_BLOCK_DELAY = 250;
48 // Tag for the start of a text block
49 private final String blockStartToken;
51 // The input stream for text
52 private InputStream inputStream;
54 // The lines of input read from the input stream
55 private final Queue<String> textLineQueue = new LinkedBlockingQueue<>();
57 // The thread used to read text from the input stream
58 private Thread textConsumputionThread;
60 // True while EOF has not been seen on input
61 private boolean eofOnInputStream = false;
64 * Constructor, initialize the text block reader.
66 * @param blockStartToken the block start token for the start of a text block
68 public HeaderDelimitedTextBlockReader(final String blockStartToken) {
69 this.blockStartToken = blockStartToken;
73 * Constructor, initialize the text block reader using token delimited event protocol
76 * @param tokenDelimitedParameters the token delimited event protocol parameters
78 public HeaderDelimitedTextBlockReader(final EventProtocolTextTokenDelimitedParameters tokenDelimitedParameters) {
79 this.blockStartToken = tokenDelimitedParameters.getDelimiterToken();
86 * org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer.TextBlockReader#
87 * init( java.io.InputStream)
90 public void init(final InputStream incomingInputStream) {
91 this.inputStream = incomingInputStream;
93 // Configure and start the text reading thread
94 textConsumputionThread = new ApplicationThreadFactory(this.getClass().getName()).newThread(this);
95 textConsumputionThread.setDaemon(true);
96 textConsumputionThread.start();
103 * org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer.TextBlockReader#
107 public TextBlock readTextBlock() throws IOException {
108 // Holder for the current text block
109 final StringBuilder textBlockBuilder = new StringBuilder();
111 // Wait for the timeout period if there is no input
112 if (!eofOnInputStream && textLineQueue.size() == 0) {
113 ThreadUtilities.sleep(TEXT_BLOCK_DELAY);
116 // Scan the lines in the queue
117 while (textLineQueue.size() > 0) {
118 // Scroll down in the available lines looking for the start of the text block
119 if (textLineQueue.peek().startsWith(blockStartToken)) {
120 // Process the input line header
121 textBlockBuilder.append(textLineQueue.remove());
122 textBlockBuilder.append('\n');
125 LOGGER.warn("invalid input on consumer: " + textLineQueue.remove());
129 // Get the rest of the text document
130 while (textLineQueue.size() > 0 && !textLineQueue.peek().startsWith(blockStartToken)) {
131 textBlockBuilder.append(textLineQueue.remove());
132 textBlockBuilder.append('\n');
135 // Condition the text block and return it
136 final String textBlock = textBlockBuilder.toString().trim();
137 final boolean endOfText = (eofOnInputStream && textLineQueue.size() == 0 ? true : false);
139 if (textBlock.length() > 0) {
140 return new TextBlock(endOfText, textBlock);
142 return new TextBlock(endOfText, null);
149 * @see java.lang.Runnable#run()
153 final BufferedReader textReader = new BufferedReader(new InputStreamReader(inputStream));
156 // Read the input line by line until we see end of file on the stream
158 while ((line = textReader.readLine()) != null) {
159 textLineQueue.add(line);
161 } catch (final IOException e) {
162 LOGGER.warn("I/O exception on text input on consumer: ", e);
164 eofOnInputStream = true;