e40bc756cbcb6c7cc7afa199377225ffbc4e63e5
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  * 
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  * 
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * 
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer;
22
23 import java.io.BufferedReader;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.io.InputStreamReader;
27 import java.util.Queue;
28 import java.util.concurrent.LinkedBlockingQueue;
29
30 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
31 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
32 import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolTextTokenDelimitedParameters;
33 import org.slf4j.ext.XLogger;
34 import org.slf4j.ext.XLoggerFactory;
35
36 /**
37  * The Class TextBlockReader reads the next block of text from an input stream.
38  *
39  * @author Liam Fallon (liam.fallon@ericsson.com)
40  */
41 public class HeaderDelimitedTextBlockReader implements TextBlockReader, Runnable {
42     // The logger for this class
43     private static final XLogger LOGGER = XLoggerFactory.getXLogger(HeaderDelimitedTextBlockReader.class);
44
45     // The amount of time to wait for input on the text block reader
46     private static final long TEXT_BLOCK_DELAY = 250;
47
48     // Tag for the start of a text block
49     private final String blockStartToken;
50
51     // The input stream for text
52     private InputStream inputStream;
53
54     // The lines of input read from the input stream
55     private final Queue<String> textLineQueue = new LinkedBlockingQueue<>();
56
57     // The thread used to read text from the input stream
58     private Thread textConsumputionThread;
59
60     // True while EOF has not been seen on input
61     private boolean eofOnInputStream = false;
62
63     /**
64      * Constructor, initialize the text block reader.
65      *
66      * @param blockStartToken the block start token for the start of a text block
67      */
68     public HeaderDelimitedTextBlockReader(final String blockStartToken) {
69         this.blockStartToken = blockStartToken;
70     }
71
72     /**
73      * Constructor, initialize the text block reader using token delimited event protocol
74      * parameters.
75      *
76      * @param tokenDelimitedParameters the token delimited event protocol parameters
77      */
78     public HeaderDelimitedTextBlockReader(final EventProtocolTextTokenDelimitedParameters tokenDelimitedParameters) {
79         this.blockStartToken = tokenDelimitedParameters.getDelimiterToken();
80     }
81
82     /*
83      * (non-Javadoc)
84      * 
85      * @see
86      * org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer.TextBlockReader#
87      * init( java.io.InputStream)
88      */
89     @Override
90     public void init(final InputStream incomingInputStream) {
91         this.inputStream = incomingInputStream;
92
93         // Configure and start the text reading thread
94         textConsumputionThread = new ApplicationThreadFactory(this.getClass().getName()).newThread(this);
95         textConsumputionThread.setDaemon(true);
96         textConsumputionThread.start();
97     }
98
99     /*
100      * (non-Javadoc)
101      * 
102      * @see
103      * org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer.TextBlockReader#
104      * readTextBlock()
105      */
106     @Override
107     public TextBlock readTextBlock() throws IOException {
108         // Holder for the current text block
109         final StringBuilder textBlockBuilder = new StringBuilder();
110
111         // Wait for the timeout period if there is no input
112         if (!eofOnInputStream && textLineQueue.size() == 0) {
113             ThreadUtilities.sleep(TEXT_BLOCK_DELAY);
114         }
115
116         // Scan the lines in the queue
117         while (textLineQueue.size() > 0) {
118             // Scroll down in the available lines looking for the start of the text block
119             if (textLineQueue.peek().startsWith(blockStartToken)) {
120                 // Process the input line header
121                 textBlockBuilder.append(textLineQueue.remove());
122                 textBlockBuilder.append('\n');
123                 break;
124             } else {
125                 LOGGER.warn("invalid input on consumer: " + textLineQueue.remove());
126             }
127         }
128
129         // Get the rest of the text document
130         while (textLineQueue.size() > 0 && !textLineQueue.peek().startsWith(blockStartToken)) {
131             textBlockBuilder.append(textLineQueue.remove());
132             textBlockBuilder.append('\n');
133         }
134
135         // Condition the text block and return it
136         final String textBlock = textBlockBuilder.toString().trim();
137         final boolean endOfText = (eofOnInputStream && textLineQueue.size() == 0 ? true : false);
138
139         if (textBlock.length() > 0) {
140             return new TextBlock(endOfText, textBlock);
141         } else {
142             return new TextBlock(endOfText, null);
143         }
144     }
145
146     /*
147      * (non-Javadoc)
148      *
149      * @see java.lang.Runnable#run()
150      */
151     @Override
152     public void run() {
153         final BufferedReader textReader = new BufferedReader(new InputStreamReader(inputStream));
154
155         try {
156             // Read the input line by line until we see end of file on the stream
157             String line;
158             while ((line = textReader.readLine()) != null) {
159                 textLineQueue.add(line);
160             }
161         } catch (final IOException e) {
162             LOGGER.warn("I/O exception on text input on consumer: ", e);
163         } finally {
164             eofOnInputStream = true;
165         }
166     }
167 }