076c50b4055eaab169536a65a724b4a043b6c556
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2020 Nordix Foundation.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer;
23
24 import java.io.FileInputStream;
25 import java.io.IOException;
26 import java.io.InputStream;
27 import java.util.EnumMap;
28 import java.util.Map;
29 import java.util.Properties;
30 import java.util.concurrent.atomic.AtomicLong;
31 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
32 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
33 import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
34 import org.onap.policy.apex.service.engine.event.ApexEventException;
35 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
36 import org.onap.policy.apex.service.engine.event.PeeredReference;
37 import org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.FileCarrierTechnologyParameters;
38 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
39 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 /**
44  * Concrete implementation an Apex event consumer that reads events from a file. This consumer also implements
45  * ApexEventProducer and therefore can be used as a synchronous consumer.
46  *
47  * @author Liam Fallon (liam.fallon@ericsson.com)
48  */
49 public class ApexFileEventConsumer implements ApexEventConsumer, Runnable {
50     // Get a reference to the logger
51     private static final Logger LOGGER = LoggerFactory.getLogger(ApexFileEventConsumer.class);
52
53     // Recurring string constants
54     private static final String APEX_FILE_CONSUMER_PREAMBLE = "ApexFileConsumer \"";
55
56     // The input stream to read events from
57     private InputStream eventInputStream;
58
59     // The text block reader that will read text blocks from the contents of the file
60     private TextBlockReader textBlockReader;
61
62     // The event receiver that will receive asynchronous events from this consumer
63     private ApexEventReceiver eventReceiver = null;
64
65     // The consumer thread and stopping flag
66     private Thread consumerThread;
67
68     // The name for this consumer
69     private String consumerName = null;
70
71     // The specific carrier technology parameters for this consumer
72     private FileCarrierTechnologyParameters fileCarrierTechnologyParameters;
73
74     // The peer references for this event handler
75     private final Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(
76                     EventHandlerPeeredMode.class);
77
78     // Holds the next identifier for event execution.
79     private static AtomicLong nextExecutionID = new AtomicLong(0L);
80
81     /**
82      * Private utility to get the next candidate value for a Execution ID. This value will always be unique in a single
83      * JVM
84      *
85      * @return the next candidate value for a Execution ID
86      */
87     private static synchronized long getNextExecutionId() {
88         return nextExecutionID.getAndIncrement();
89     }
90
91     /**
92      * {@inheritDoc}.
93      */
94     @Override
95     public void init(final String name, final EventHandlerParameters consumerParameters,
96                     final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
97         this.eventReceiver = incomingEventReceiver;
98         this.consumerName = name;
99
100         // Get and check the Apex parameters from the parameter service
101         if (consumerParameters == null) {
102             final String errorMessage = "Consumer parameters for ApexFileConsumer \"" + consumerName + "\" is null";
103             LOGGER.warn(errorMessage);
104             throw new ApexEventException(errorMessage);
105         }
106
107         // Check and get the file Properties
108         if (!(consumerParameters.getCarrierTechnologyParameters() instanceof FileCarrierTechnologyParameters)) {
109             final String errorMessage = "specified consumer properties for ApexFileConsumer \"" + consumerName
110                             + "\" are not applicable to a File consumer";
111             LOGGER.warn(errorMessage);
112             throw new ApexEventException(errorMessage);
113         }
114         fileCarrierTechnologyParameters = (FileCarrierTechnologyParameters) consumerParameters
115                         .getCarrierTechnologyParameters();
116
117         // Open the file producing events
118         try {
119             if (fileCarrierTechnologyParameters.isStandardIo()) {
120                 eventInputStream = System.in;
121             } else {
122                 eventInputStream = new FileInputStream(fileCarrierTechnologyParameters.getFileName());
123             }
124
125             // Get an event composer for our event source
126             textBlockReader = new TextBlockReaderFactory().getTaggedReader(eventInputStream,
127                             consumerParameters.getEventProtocolParameters());
128         } catch (final IOException e) {
129             final String errorMessage = APEX_FILE_CONSUMER_PREAMBLE + consumerName
130                             + "\" failed to open file for reading: \"" + fileCarrierTechnologyParameters.getFileName()
131                             + "\"";
132             LOGGER.warn(errorMessage, e);
133             throw new ApexEventException(errorMessage, e);
134         }
135
136         if (fileCarrierTechnologyParameters.getStartDelay() > 0) {
137             ThreadUtilities.sleep(fileCarrierTechnologyParameters.getStartDelay());
138         }
139     }
140
141     /**
142      * {@inheritDoc}.
143      */
144     @Override
145     public String getName() {
146         return consumerName;
147     }
148
149     /**
150      * {@inheritDoc}.
151      */
152     @Override
153     public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
154         return peerReferenceMap.get(peeredMode);
155     }
156
157     /**
158      * {@inheritDoc}.
159      */
160     @Override
161     public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
162         peerReferenceMap.put(peeredMode, peeredReference);
163     }
164
165     /**
166      * {@inheritDoc}.
167      */
168     @Override
169     public void start() {
170         // Configure and start the event reception thread
171         final String threadName = this.getClass().getName() + " : " + consumerName;
172         consumerThread = new ApplicationThreadFactory(threadName).newThread(this);
173         consumerThread.setDaemon(true);
174         consumerThread.start();
175     }
176
177     /**
178      * {@inheritDoc}.
179      */
180     @Override
181     public void run() {
182         // Check that we have been initialized in async or sync mode
183         if (eventReceiver == null) {
184             LOGGER.warn("\"{}\" has not been initilaized for either asynchronous or synchronous event handling",
185                             consumerName);
186             return;
187         }
188
189         // Read the events from the file while there are still events in the file
190         try {
191             // Read all the text blocks
192             TextBlock textBlock;
193             do {
194                 // Read the text block
195                 textBlock = textBlockReader.readTextBlock();
196
197                 // Process the event from the text block if there is one there
198                 if (textBlock.getText() != null) {
199                     eventReceiver.receiveEvent(getNextExecutionId(), new Properties(), textBlock.getText());
200                 }
201             } while (!textBlock.isEndOfText());
202         } catch (final Exception e) {
203             LOGGER.warn("\"{}\" failed to read event from file: \"{}\"", consumerName,
204                     fileCarrierTechnologyParameters.getFileName(), e);
205         } finally {
206             try {
207                 eventInputStream.close();
208             } catch (final IOException e) {
209                 LOGGER.warn("{}{}\" failed to close file: \"{}\"", APEX_FILE_CONSUMER_PREAMBLE, consumerName,
210                         fileCarrierTechnologyParameters.getFileName(), e);
211             }
212         }
213
214     }
215
216     /**
217      * {@inheritDoc}.
218      */
219     @Override
220     public void stop() {
221         try {
222             eventInputStream.close();
223         } catch (final IOException e) {
224             LOGGER.warn("{}{}\" failed to close file for reading: \"{}\"",
225                     APEX_FILE_CONSUMER_PREAMBLE, consumerName, fileCarrierTechnologyParameters.getFileName(), e);
226         }
227
228         if (consumerThread.isAlive() && !consumerThread.isInterrupted()) {
229             consumerThread.interrupt();
230         }
231     }
232 }