7366528c75e57ff7dd18c95e0c47ad16ae6e23ba
[policy/apex-pdp.git] / services / services-engine / src / main / java / org / onap / policy / apex / service / engine / event / impl / filecarrierplugin / consumer / ApexFileEventConsumer.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer;
22
23 import java.io.FileInputStream;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.util.EnumMap;
27 import java.util.Map;
28 import java.util.Properties;
29 import java.util.concurrent.atomic.AtomicLong;
30
31 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
32 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
33 import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
34 import org.onap.policy.apex.service.engine.event.ApexEventException;
35 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
36 import org.onap.policy.apex.service.engine.event.PeeredReference;
37 import org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.FileCarrierTechnologyParameters;
38 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
39 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 /**
44  * Concrete implementation an Apex event consumer that reads events from a file. This consumer also implements
45  * ApexEventProducer and therefore can be used as a synchronous consumer.
46  *
47  * @author Liam Fallon (liam.fallon@ericsson.com)
48  */
49 public class ApexFileEventConsumer implements ApexEventConsumer, Runnable {
50     // Get a reference to the logger
51     private static final Logger LOGGER = LoggerFactory.getLogger(ApexFileEventConsumer.class);
52
53     // Recurring string constants
54     private static final String APEX_FILE_CONSUMER_PREAMBLE = "ApexFileConsumer \"";
55
56     // The input stream to read events from
57     private InputStream eventInputStream;
58
59     // The text block reader that will read text blocks from the contents of the file
60     private TextBlockReader textBlockReader;
61
62     // The event receiver that will receive asynchronous events from this consumer
63     private ApexEventReceiver eventReceiver = null;
64
65     // The consumer thread and stopping flag
66     private Thread consumerThread;
67
68     // The name for this consumer
69     private String consumerName = null;
70
71     // The specific carrier technology parameters for this consumer
72     private FileCarrierTechnologyParameters fileCarrierTechnologyParameters;
73
74     // The peer references for this event handler
75     private final Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(
76                     EventHandlerPeeredMode.class);
77
78     // Holds the next identifier for event execution.
79     private static AtomicLong nextExecutionID = new AtomicLong(0L);
80
81     /**
82      * Private utility to get the next candidate value for a Execution ID. This value will always be unique in a single
83      * JVM
84      *
85      * @return the next candidate value for a Execution ID
86      */
87     private static synchronized long getNextExecutionId() {
88         return nextExecutionID.getAndIncrement();
89     }
90
91     /**
92      * {@inheritDoc}.
93      */
94     @Override
95     public void init(final String name, final EventHandlerParameters consumerParameters,
96                     final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
97         this.eventReceiver = incomingEventReceiver;
98         this.consumerName = name;
99
100         // Get and check the Apex parameters from the parameter service
101         if (consumerParameters == null) {
102             final String errorMessage = "Consumer parameters for ApexFileConsumer \"" + consumerName + "\" is null";
103             LOGGER.warn(errorMessage);
104             throw new ApexEventException(errorMessage);
105         }
106
107         // Check and get the file Properties
108         if (!(consumerParameters.getCarrierTechnologyParameters() instanceof FileCarrierTechnologyParameters)) {
109             final String errorMessage = "specified consumer properties for ApexFileConsumer \"" + consumerName
110                             + "\" are not applicable to a File consumer";
111             LOGGER.warn(errorMessage);
112             throw new ApexEventException(errorMessage);
113         }
114         fileCarrierTechnologyParameters = (FileCarrierTechnologyParameters) consumerParameters
115                         .getCarrierTechnologyParameters();
116
117         // Open the file producing events
118         try {
119             if (fileCarrierTechnologyParameters.isStandardIo()) {
120                 eventInputStream = System.in;
121             } else {
122                 eventInputStream = new FileInputStream(fileCarrierTechnologyParameters.getFileName());
123             }
124
125             // Get an event composer for our event source
126             textBlockReader = new TextBlockReaderFactory().getTaggedReader(eventInputStream,
127                             consumerParameters.getEventProtocolParameters());
128         } catch (final IOException e) {
129             final String errorMessage = APEX_FILE_CONSUMER_PREAMBLE + consumerName
130                             + "\" failed to open file for reading: \"" + fileCarrierTechnologyParameters.getFileName()
131                             + "\"";
132             LOGGER.warn(errorMessage, e);
133             throw new ApexEventException(errorMessage, e);
134         }
135
136         if (fileCarrierTechnologyParameters.getStartDelay() > 0) {
137             ThreadUtilities.sleep(fileCarrierTechnologyParameters.getStartDelay());
138         }
139     }
140
141     /**
142      * {@inheritDoc}.
143      */
144     @Override
145     public String getName() {
146         return consumerName;
147     }
148
149     /**
150      * {@inheritDoc}.
151      */
152     @Override
153     public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
154         return peerReferenceMap.get(peeredMode);
155     }
156
157     /**
158      * {@inheritDoc}.
159      */
160     @Override
161     public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
162         peerReferenceMap.put(peeredMode, peeredReference);
163     }
164
165     /**
166      * {@inheritDoc}.
167      */
168     @Override
169     public void start() {
170         // Configure and start the event reception thread
171         final String threadName = this.getClass().getName() + " : " + consumerName;
172         consumerThread = new ApplicationThreadFactory(threadName).newThread(this);
173         consumerThread.setDaemon(true);
174         consumerThread.start();
175     }
176
177     /**
178      * {@inheritDoc}.
179      */
180     @Override
181     public void run() {
182         // Check that we have been initialized in async or sync mode
183         if (eventReceiver == null) {
184             LOGGER.warn("\"{}\" has not been initilaized for either asynchronous or synchronous event handling",
185                             consumerName);
186             return;
187         }
188
189         // Read the events from the file while there are still events in the file
190         try {
191             // Read all the text blocks
192             TextBlock textBlock;
193             do {
194                 // Read the text block
195                 textBlock = textBlockReader.readTextBlock();
196
197                 // Process the event from the text block if there is one there
198                 if (textBlock.getText() != null) {
199                     eventReceiver.receiveEvent(getNextExecutionId(), new Properties(), textBlock.getText());
200                 }
201             }
202             while (!textBlock.isEndOfText());
203         } catch (final Exception e) {
204             LOGGER.warn("\"" + consumerName + "\" failed to read event from file: \""
205                             + fileCarrierTechnologyParameters.getFileName() + "\"", e);
206         } finally {
207             try {
208                 eventInputStream.close();
209             } catch (final IOException e) {
210                 LOGGER.warn(APEX_FILE_CONSUMER_PREAMBLE + consumerName + "\" failed to close file: \""
211                                 + fileCarrierTechnologyParameters.getFileName() + "\"", e);
212             }
213         }
214
215     }
216
217     /**
218      * {@inheritDoc}.
219      */
220     @Override
221     public void stop() {
222         try {
223             eventInputStream.close();
224         } catch (final IOException e) {
225             LOGGER.warn(APEX_FILE_CONSUMER_PREAMBLE + consumerName + "\" failed to close file for reading: \""
226                             + fileCarrierTechnologyParameters.getFileName() + "\"", e);
227         }
228
229         if (consumerThread.isAlive() && !consumerThread.isInterrupted()) {
230             consumerThread.interrupt();
231         }
232     }
233 }