2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * Modifications Copyright (C) 2020-2021 Nordix Foundation.
5 * Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 * SPDX-License-Identifier: Apache-2.0
20 * ============LICENSE_END=========================================================
23 package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer;
25 import java.io.BufferedReader;
26 import java.io.IOException;
27 import java.io.InputStream;
28 import java.io.InputStreamReader;
29 import java.util.Queue;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
32 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
33 import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolTextTokenDelimitedParameters;
34 import org.slf4j.ext.XLogger;
35 import org.slf4j.ext.XLoggerFactory;
38 * The Class TextBlockReader reads the next block of text from an input stream.
40 * @author Liam Fallon (liam.fallon@ericsson.com)
42 public class HeaderDelimitedTextBlockReader implements TextBlockReader, Runnable {
43 // The logger for this class
44 private static final XLogger LOGGER = XLoggerFactory.getXLogger(HeaderDelimitedTextBlockReader.class);
46 // The amount of time to wait for input on the text block reader
47 private static final long TEXT_BLOCK_DELAY = 250;
49 // Tag for the start and end of text blocks
50 private final String blockStartToken;
51 private final String blockEndToken;
53 // Indicates that text block processing starts at the first block of text
54 private final boolean delimiterAtStart;
55 private boolean blockEndTokenUsed;
57 // The input stream for text
58 private InputStream inputStream;
60 // The lines of input read from the input stream
61 private final Queue<String> textLineQueue = new LinkedBlockingQueue<>();
63 // True while EOF has not been seen on input
64 private boolean eofOnInputStream = false;
67 * Constructor, initialize the text block reader using token delimited event protocol parameters.
69 * @param tokenDelimitedParameters
70 * the token delimited event protocol parameters
72 public HeaderDelimitedTextBlockReader(final EventProtocolTextTokenDelimitedParameters tokenDelimitedParameters) {
73 this(tokenDelimitedParameters.getStartDelimiterToken(), tokenDelimitedParameters.getEndDelimiterToken(),
74 tokenDelimitedParameters.isDelimiterAtStart());
78 * Constructor, initialize the text block reader.
80 * @param blockStartToken
81 * the block start token for the start of a text block
82 * @param blockEndToken
83 * the block end token for the end of a text block
84 * @param delimiterAtStart
85 * indicates that text block processing starts at the first block of text
87 public HeaderDelimitedTextBlockReader(final String blockStartToken, final String blockEndToken,
88 final boolean delimiterAtStart) {
89 this.blockStartToken = blockStartToken;
90 this.delimiterAtStart = delimiterAtStart;
92 if (blockEndToken == null) {
93 this.blockEndToken = blockStartToken;
94 this.blockEndTokenUsed = false;
96 this.blockEndToken = blockEndToken;
97 this.blockEndTokenUsed = true;
105 public void init(final InputStream incomingInputStream) {
106 this.inputStream = incomingInputStream;
108 // Configure and start the text reading thread
109 Thread textConsumputionThread = new ApplicationThreadFactory(this.getClass().getName()).newThread(this);
110 textConsumputionThread.setDaemon(true);
111 textConsumputionThread.start();
118 public TextBlock readTextBlock() throws IOException {
119 // Holder for the current text block
120 final StringBuilder textBlockBuilder = new StringBuilder();
122 // Wait for the timeout period if there is no input
123 if (!eofOnInputStream && textLineQueue.isEmpty()) {
124 ThreadUtilities.sleep(TEXT_BLOCK_DELAY);
127 // Scan the lines in the queue
128 while (!textLineQueue.isEmpty()) {
129 // Scroll down in the available lines looking for the start of the text block
130 if (!delimiterAtStart || textLineQueue.peek().startsWith(blockStartToken)) {
131 // Process the input line header
132 textBlockBuilder.append(textLineQueue.remove());
133 textBlockBuilder.append('\n');
136 String consumer = textLineQueue.remove();
137 LOGGER.warn("invalid input on consumer: {}", consumer);
141 // Get the rest of the text document
142 while (!textLineQueue.isEmpty() && !textLineQueue.peek().startsWith(blockEndToken)
143 && !textLineQueue.peek().startsWith(blockStartToken)) {
144 // We just strip out block end tokens because we use block start tokens to delimit the blocks of text
145 textBlockBuilder.append(textLineQueue.remove());
146 textBlockBuilder.append('\n');
149 // Check if we should add the block end token to the end of the text block
150 if (!textLineQueue.isEmpty() && blockEndTokenUsed && textLineQueue.peek().startsWith(blockEndToken)) {
151 // Process the input line header
152 textBlockBuilder.append(textLineQueue.remove());
153 textBlockBuilder.append('\n');
156 // Condition the text block and return it
157 final String textBlock = textBlockBuilder.toString().trim();
158 final boolean endOfText = eofOnInputStream && textLineQueue.isEmpty();
160 if (textBlock.length() > 0) {
161 return new TextBlock(endOfText, textBlock);
163 return new TextBlock(endOfText, null);
172 try (BufferedReader textReader = new BufferedReader(new InputStreamReader(inputStream))) {
173 // Read the input line by line until we see end of file on the stream
175 while ((line = textReader.readLine()) != null) {
176 textLineQueue.add(line);
178 } catch (final IOException e) {
179 LOGGER.warn("I/O exception on text input on consumer: ", e);
181 eofOnInputStream = true;