67b7cb8713056b7a744fec450f1add1f9058022f
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2020-2021 Nordix Foundation.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer;
23
24 import java.io.BufferedReader;
25 import java.io.IOException;
26 import java.io.InputStream;
27 import java.io.InputStreamReader;
28 import java.util.Queue;
29 import java.util.concurrent.LinkedBlockingQueue;
30 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
31 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
32 import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolTextTokenDelimitedParameters;
33 import org.slf4j.ext.XLogger;
34 import org.slf4j.ext.XLoggerFactory;
35
36 /**
37  * The Class TextBlockReader reads the next block of text from an input stream.
38  *
39  * @author Liam Fallon (liam.fallon@ericsson.com)
40  */
41 public class HeaderDelimitedTextBlockReader implements TextBlockReader, Runnable {
42     // The logger for this class
43     private static final XLogger LOGGER = XLoggerFactory.getXLogger(HeaderDelimitedTextBlockReader.class);
44
45     // The amount of time to wait for input on the text block reader
46     private static final long TEXT_BLOCK_DELAY = 250;
47
48     // Tag for the start and end of text blocks
49     private final String blockStartToken;
50     private final String blockEndToken;
51
52     // Indicates that text block processing starts at the first block of text
53     private final boolean delimiterAtStart;
54     private boolean blockEndTokenUsed;
55
56     // The thread used to read the text from the stream
57     private Thread textConsumputionThread;
58
59     // The input stream for text
60     private InputStream inputStream;
61
62     // The lines of input read from the input stream
63     private final Queue<String> textLineQueue = new LinkedBlockingQueue<>();
64
65     // True while EOF has not been seen on input
66     private boolean eofOnInputStream = false;
67
68     /**
69      * Constructor, initialize the text block reader using token delimited event protocol parameters.
70      *
71      * @param tokenDelimitedParameters
72      *        the token delimited event protocol parameters
73      */
74     public HeaderDelimitedTextBlockReader(final EventProtocolTextTokenDelimitedParameters tokenDelimitedParameters) {
75         this(tokenDelimitedParameters.getStartDelimiterToken(), tokenDelimitedParameters.getEndDelimiterToken(),
76                         tokenDelimitedParameters.isDelimiterAtStart());
77     }
78
79     /**
80      * Constructor, initialize the text block reader.
81      *
82      * @param blockStartToken
83      *        the block start token for the start of a text block
84      * @param blockEndToken
85      *        the block end token for the end of a text block
86      * @param delimiterAtStart
87      *        indicates that text block processing starts at the first block of text
88      */
89     public HeaderDelimitedTextBlockReader(final String blockStartToken, final String blockEndToken,
90                     final boolean delimiterAtStart) {
91         this.blockStartToken = blockStartToken;
92         this.delimiterAtStart = delimiterAtStart;
93
94         if (blockEndToken == null) {
95             this.blockEndToken = blockStartToken;
96             this.blockEndTokenUsed = false;
97         } else {
98             this.blockEndToken = blockEndToken;
99             this.blockEndTokenUsed = true;
100         }
101     }
102
103     /**
104      * {@inheritDoc}.
105      */
106     @Override
107     public void init(final InputStream incomingInputStream) {
108         this.inputStream = incomingInputStream;
109
110         // Configure and start the text reading thread
111         textConsumputionThread = new ApplicationThreadFactory(this.getClass().getName()).newThread(this);
112         textConsumputionThread.setDaemon(true);
113         textConsumputionThread.start();
114     }
115
116     /**
117      * {@inheritDoc}.
118      */
119     @Override
120     public TextBlock readTextBlock() throws IOException {
121         // Holder for the current text block
122         final StringBuilder textBlockBuilder = new StringBuilder();
123
124         // Wait for the timeout period if there is no input
125         if (!eofOnInputStream && textLineQueue.isEmpty()) {
126             ThreadUtilities.sleep(TEXT_BLOCK_DELAY);
127         }
128
129         // Scan the lines in the queue
130         while (!textLineQueue.isEmpty()) {
131             // Scroll down in the available lines looking for the start of the text block
132             if (!delimiterAtStart || textLineQueue.peek().startsWith(blockStartToken)) {
133                 // Process the input line header
134                 textBlockBuilder.append(textLineQueue.remove());
135                 textBlockBuilder.append('\n');
136                 break;
137             } else {
138                 String consumer = textLineQueue.remove();
139                 LOGGER.warn("invalid input on consumer: {}", consumer);
140             }
141         }
142
143         // Get the rest of the text document
144         while (!textLineQueue.isEmpty() && !textLineQueue.peek().startsWith(blockEndToken)
145                         && !textLineQueue.peek().startsWith(blockStartToken)) {
146             // We just strip out block end tokens because we use block start tokens to delimit the blocks of text
147             textBlockBuilder.append(textLineQueue.remove());
148             textBlockBuilder.append('\n');
149         }
150
151         // Check if we should add the block end token to the end of the text block
152         if (!textLineQueue.isEmpty() && blockEndTokenUsed && textLineQueue.peek().startsWith(blockEndToken)) {
153             // Process the input line header
154             textBlockBuilder.append(textLineQueue.remove());
155             textBlockBuilder.append('\n');
156         }
157
158         // Condition the text block and return it
159         final String textBlock = textBlockBuilder.toString().trim();
160         final boolean endOfText = eofOnInputStream && textLineQueue.isEmpty();
161
162         if (textBlock.length() > 0) {
163             return new TextBlock(endOfText, textBlock);
164         } else {
165             return new TextBlock(endOfText, null);
166         }
167     }
168
169     /**
170      * {@inheritDoc}.
171      */
172     @Override
173     public void run() {
174         try (BufferedReader textReader = new BufferedReader(new InputStreamReader(inputStream))) {
175             // Read the input line by line until we see end of file on the stream
176             String line;
177             while ((line = textReader.readLine()) != null) {
178                 textLineQueue.add(line);
179             }
180         } catch (final IOException e) {
181             LOGGER.warn("I/O exception on text input on consumer: ", e);
182         } finally {
183             eofOnInputStream = true;
184         }
185     }
186 }