c2f14a261ab2c1c6d393343242d2893ebdc40c38
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2020-2021 Nordix Foundation.
5  *  Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  * SPDX-License-Identifier: Apache-2.0
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer;
24
25 import java.io.BufferedReader;
26 import java.io.IOException;
27 import java.io.InputStream;
28 import java.io.InputStreamReader;
29 import java.util.Queue;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
32 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
33 import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolTextTokenDelimitedParameters;
34 import org.slf4j.ext.XLogger;
35 import org.slf4j.ext.XLoggerFactory;
36
37 /**
38  * The Class TextBlockReader reads the next block of text from an input stream.
39  *
40  * @author Liam Fallon (liam.fallon@ericsson.com)
41  */
42 public class HeaderDelimitedTextBlockReader implements TextBlockReader, Runnable {
43     // The logger for this class
44     private static final XLogger LOGGER = XLoggerFactory.getXLogger(HeaderDelimitedTextBlockReader.class);
45
46     // The amount of time to wait for input on the text block reader
47     private static final long TEXT_BLOCK_DELAY = 250;
48
49     // Tag for the start and end of text blocks
50     private final String blockStartToken;
51     private final String blockEndToken;
52
53     // Indicates that text block processing starts at the first block of text
54     private final boolean delimiterAtStart;
55     private boolean blockEndTokenUsed;
56
57     // The input stream for text
58     private InputStream inputStream;
59
60     // The lines of input read from the input stream
61     private final Queue<String> textLineQueue = new LinkedBlockingQueue<>();
62
63     // True while EOF has not been seen on input
64     private boolean eofOnInputStream = false;
65
66     /**
67      * Constructor, initialize the text block reader using token delimited event protocol parameters.
68      *
69      * @param tokenDelimitedParameters
70      *        the token delimited event protocol parameters
71      */
72     public HeaderDelimitedTextBlockReader(final EventProtocolTextTokenDelimitedParameters tokenDelimitedParameters) {
73         this(tokenDelimitedParameters.getStartDelimiterToken(), tokenDelimitedParameters.getEndDelimiterToken(),
74                         tokenDelimitedParameters.isDelimiterAtStart());
75     }
76
77     /**
78      * Constructor, initialize the text block reader.
79      *
80      * @param blockStartToken
81      *        the block start token for the start of a text block
82      * @param blockEndToken
83      *        the block end token for the end of a text block
84      * @param delimiterAtStart
85      *        indicates that text block processing starts at the first block of text
86      */
87     public HeaderDelimitedTextBlockReader(final String blockStartToken, final String blockEndToken,
88                     final boolean delimiterAtStart) {
89         this.blockStartToken = blockStartToken;
90         this.delimiterAtStart = delimiterAtStart;
91
92         if (blockEndToken == null) {
93             this.blockEndToken = blockStartToken;
94             this.blockEndTokenUsed = false;
95         } else {
96             this.blockEndToken = blockEndToken;
97             this.blockEndTokenUsed = true;
98         }
99     }
100
101     /**
102      * {@inheritDoc}.
103      */
104     @Override
105     public void init(final InputStream incomingInputStream) {
106         this.inputStream = incomingInputStream;
107
108         // Configure and start the text reading thread
109         Thread textConsumputionThread = new ApplicationThreadFactory(this.getClass().getName()).newThread(this);
110         textConsumputionThread.setDaemon(true);
111         textConsumputionThread.start();
112     }
113
114     /**
115      * {@inheritDoc}.
116      */
117     @Override
118     public TextBlock readTextBlock() throws IOException {
119         // Holder for the current text block
120         final StringBuilder textBlockBuilder = new StringBuilder();
121
122         // Wait for the timeout period if there is no input
123         if (!eofOnInputStream && textLineQueue.isEmpty()) {
124             ThreadUtilities.sleep(TEXT_BLOCK_DELAY);
125         }
126
127         // Scan the lines in the queue
128         while (!textLineQueue.isEmpty()) {
129             // Scroll down in the available lines looking for the start of the text block
130             if (!delimiterAtStart || textLineQueue.peek().startsWith(blockStartToken)) {
131                 // Process the input line header
132                 textBlockBuilder.append(textLineQueue.remove());
133                 textBlockBuilder.append('\n');
134                 break;
135             } else {
136                 String consumer = textLineQueue.remove();
137                 LOGGER.warn("invalid input on consumer: {}", consumer);
138             }
139         }
140
141         // Get the rest of the text document
142         while (!textLineQueue.isEmpty() && !textLineQueue.peek().startsWith(blockEndToken)
143                         && !textLineQueue.peek().startsWith(blockStartToken)) {
144             // We just strip out block end tokens because we use block start tokens to delimit the blocks of text
145             textBlockBuilder.append(textLineQueue.remove());
146             textBlockBuilder.append('\n');
147         }
148
149         // Check if we should add the block end token to the end of the text block
150         if (!textLineQueue.isEmpty() && blockEndTokenUsed && textLineQueue.peek().startsWith(blockEndToken)) {
151             // Process the input line header
152             textBlockBuilder.append(textLineQueue.remove());
153             textBlockBuilder.append('\n');
154         }
155
156         // Condition the text block and return it
157         final String textBlock = textBlockBuilder.toString().trim();
158         final boolean endOfText = eofOnInputStream && textLineQueue.isEmpty();
159
160         if (textBlock.length() > 0) {
161             return new TextBlock(endOfText, textBlock);
162         } else {
163             return new TextBlock(endOfText, null);
164         }
165     }
166
167     /**
168      * {@inheritDoc}.
169      */
170     @Override
171     public void run() {
172         try (BufferedReader textReader = new BufferedReader(new InputStreamReader(inputStream))) {
173             // Read the input line by line until we see end of file on the stream
174             String line;
175             while ((line = textReader.readLine()) != null) {
176                 textLineQueue.add(line);
177             }
178         } catch (final IOException e) {
179             LOGGER.warn("I/O exception on text input on consumer: ", e);
180         } finally {
181             eofOnInputStream = true;
182         }
183     }
184 }