7521c3a081dd46c4979f85455e0303cdc2ae18f5
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  * 
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  * 
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * 
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer;
22
23 import java.io.FileInputStream;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.util.EnumMap;
27 import java.util.Map;
28 import java.util.concurrent.atomic.AtomicLong;
29
30 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
31 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
32 import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
33 import org.onap.policy.apex.service.engine.event.ApexEventException;
34 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
35 import org.onap.policy.apex.service.engine.event.PeeredReference;
36 import org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.FILECarrierTechnologyParameters;
37 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
38 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 /**
43  * Concrete implementation an Apex event consumer that reads events from a file. This consumer also
44  * implements ApexEventProducer and therefore can be used as a synchronous consumer.
45  *
46  * @author Liam Fallon (liam.fallon@ericsson.com)
47  */
48 public class ApexFileEventConsumer implements ApexEventConsumer, Runnable {
49
50     // Get a reference to the logger
51     private static final Logger LOGGER = LoggerFactory.getLogger(ApexFileEventConsumer.class);
52
53     // The input stream to read events from
54     private InputStream eventInputStream;
55
56     // The text block reader that will read text blocks from the contents of the file
57     private TextBlockReader textBlockReader;
58
59     // The event receiver that will receive asynchronous events from this consumer
60     private ApexEventReceiver eventReceiver = null;
61
62     // The consumer thread and stopping flag
63     private Thread consumerThread;
64
65     // The name for this consumer
66     private String consumerName = null;
67
68     // The specific carrier technology parameters for this consumer
69     private FILECarrierTechnologyParameters fileCarrierTechnologyParameters;
70
71     // The peer references for this event handler
72     private final Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap =
73             new EnumMap<>(EventHandlerPeeredMode.class);
74
75     // Holds the next identifier for event execution.
76     private static AtomicLong nextExecutionID = new AtomicLong(0L);
77
78     /**
79      * Private utility to get the next candidate value for a Execution ID. This value will always be
80      * unique in a single JVM
81      * 
82      * @return the next candidate value for a Execution ID
83      */
84     private static synchronized long getNextExecutionID() {
85         return nextExecutionID.getAndIncrement();
86     }
87
88     /*
89      * (non-Javadoc)
90      *
91      * @see
92      * org.onap.policy.apex.apps.uservice.consumer.ApexEventConsumer#init(org.onap.policy.apex.apps.
93      * uservice.consumer.ApexEventReceiver)
94      */
95     @Override
96     public void init(final String name, final EventHandlerParameters consumerParameters,
97             final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
98         this.eventReceiver = incomingEventReceiver;
99         this.consumerName = name;
100
101         // Get and check the Apex parameters from the parameter service
102         if (consumerParameters == null) {
103             final String errorMessage = "Consumer parameters for ApexFileConsumer \"" + consumerName + "\" is null";
104             LOGGER.warn(errorMessage);
105             throw new ApexEventException(errorMessage);
106         }
107
108         // Check and get the file Properties
109         if (!(consumerParameters.getCarrierTechnologyParameters() instanceof FILECarrierTechnologyParameters)) {
110             final String errorMessage = "specified consumer properties for ApexFileConsumer \"" + consumerName
111                     + "\" are not applicable to a File consumer";
112             LOGGER.warn(errorMessage);
113             throw new ApexEventException(errorMessage);
114         }
115         fileCarrierTechnologyParameters =
116                 (FILECarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters();
117
118         // Open the file producing events
119         try {
120             if (fileCarrierTechnologyParameters.isStandardIO()) {
121                 eventInputStream = System.in;
122             } else {
123                 eventInputStream = new FileInputStream(fileCarrierTechnologyParameters.getFileName());
124             }
125
126             // Get an event composer for our event source
127             textBlockReader = new TextBlockReaderFactory().getTaggedReader(eventInputStream,
128                     consumerParameters.getEventProtocolParameters());
129         } catch (final IOException e) {
130             final String errorMessage = "ApexFileConsumer \"" + consumerName + "\" failed to open file for reading: \""
131                     + fileCarrierTechnologyParameters.getFileName() + "\"";
132             LOGGER.warn(errorMessage, e);
133             throw new ApexEventException(errorMessage, e);
134         }
135
136         if (fileCarrierTechnologyParameters.getStartDelay() > 0) {
137             ThreadUtilities.sleep(fileCarrierTechnologyParameters.getStartDelay());
138         }
139     }
140
141     /*
142      * (non-Javadoc)
143      * 
144      * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#getName()
145      */
146     @Override
147     public String getName() {
148         return consumerName;
149     }
150
151     /*
152      * (non-Javadoc)
153      * 
154      * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#getPeeredReference(org.onap.
155      * policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode)
156      */
157     @Override
158     public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
159         return peerReferenceMap.get(peeredMode);
160     }
161
162     /*
163      * (non-Javadoc)
164      * 
165      * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#setPeeredReference(org.onap.
166      * policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode,
167      * org.onap.policy.apex.service.engine.event.PeeredReference)
168      */
169     @Override
170     public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
171         peerReferenceMap.put(peeredMode, peeredReference);
172     }
173
174     /*
175      * (non-Javadoc)
176      * 
177      * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#start()
178      */
179     @Override
180     public void start() {
181         // Configure and start the event reception thread
182         final String threadName = this.getClass().getName() + " : " + consumerName;
183         consumerThread = new ApplicationThreadFactory(threadName).newThread(this);
184         consumerThread.setDaemon(true);
185         consumerThread.start();
186     }
187
188     /*
189      * (non-Javadoc)
190      *
191      * @see java.lang.Runnable#run()
192      */
193     @Override
194     public void run() {
195         // Check that we have been initialized in async or sync mode
196         if (eventReceiver == null) {
197             LOGGER.warn("\"{}\" has not been initilaized for either asynchronous or synchronous event handling",
198                     consumerName);
199             return;
200         }
201
202         // Read the events from the file while there are still events in the file
203         try {
204             // Read all the text blocks
205             TextBlock textBlock;
206             do {
207                 // Read the text block
208                 textBlock = textBlockReader.readTextBlock();
209
210                 // Process the event from the text block if there is one there
211                 if (textBlock.getText() != null) {
212                     eventReceiver.receiveEvent(getNextExecutionID(), textBlock.getText());
213                 }
214             } while (!textBlock.isEndOfText());
215         } catch (final Exception e) {
216             LOGGER.warn("\"" + consumerName + "\" failed to read event from file: \""
217                     + fileCarrierTechnologyParameters.getFileName() + "\"", e);
218         } finally {
219             try {
220                 eventInputStream.close();
221             } catch (final IOException e) {
222                 LOGGER.warn("ApexFileConsumer \"" + consumerName + "\" failed to close file: \""
223                         + fileCarrierTechnologyParameters.getFileName() + "\"", e);
224             }
225         }
226
227     }
228
229     /*
230      * (non-Javadoc)
231      *
232      * @see org.onap.policy.apex.apps.uservice.producer.ApexEventProducer#stop()
233      */
234     @Override
235     public void stop() {
236         try {
237             eventInputStream.close();
238         } catch (final IOException e) {
239             LOGGER.warn("ApexFileConsumer \"" + consumerName + "\" failed to close file for reading: \""
240                     + fileCarrierTechnologyParameters.getFileName() + "\"", e);
241         }
242
243         if (consumerThread.isAlive() && !consumerThread.isInterrupted()) {
244             consumerThread.interrupt();
245         }
246     }
247 }