97aa25fbef1ddac5b591fae03cd064095a161ce5
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  * 
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  * 
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * 
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.service.engine.main;
22
23 import java.util.List;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.LinkedBlockingQueue;
26 import java.util.concurrent.TimeUnit;
27
28 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
29 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
30 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
31 import org.onap.policy.apex.service.engine.event.ApexEvent;
32 import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
33 import org.onap.policy.apex.service.engine.event.ApexEventException;
34 import org.onap.policy.apex.service.engine.event.ApexEventProtocolConverter;
35 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
36 import org.onap.policy.apex.service.engine.event.PeeredReference;
37 import org.onap.policy.apex.service.engine.event.SynchronousEventCache;
38 import org.onap.policy.apex.service.engine.event.impl.EventConsumerFactory;
39 import org.onap.policy.apex.service.engine.event.impl.EventProtocolFactory;
40 import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters;
41 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
42 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
43 import org.slf4j.ext.XLogger;
44 import org.slf4j.ext.XLoggerFactory;
45
46 /**
47  * This event unmarshaler handles events coming into Apex, handles threading, event queuing, transformation and
48  * receiving using the configured receiving technology.
49  *
50  * @author Liam Fallon (liam.fallon@ericsson.com)
51  */
52 public class ApexEventUnmarshaller implements ApexEventReceiver, Runnable {
53     // Get a reference to the logger
54     private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexEventUnmarshaller.class);
55
56     // Interval to wait between thread shutdown checks
57     private static final int UNMARSHALLER_SHUTDOWN_WAIT_INTERVAL = 10;
58
59     // The amount of time to wait between polls of the event queue in milliseconds
60     private static final long EVENT_QUEUE_POLL_INTERVAL = 20;
61
62     // The name of the unmarshaler
63     private final String name;
64
65     // The engine service and consumer parameters
66     private final EngineServiceParameters engineServiceParameters;
67     private final EventHandlerParameters consumerParameters;
68
69     // The engine service handler to use for forwarding on of unmarshalled events
70     private ApexEngineServiceHandler engineServiceHandler;
71
72     // Apex event producer and event converter, all events are sent as string representations
73     private ApexEventConsumer consumer;
74     private ApexEventProtocolConverter converter;
75
76     // Temporary event holder for events going into Apex
77     private final BlockingQueue<ApexEvent> queue = new LinkedBlockingQueue<>();
78
79     // The unmarshaler thread and stopping flag
80     private Thread unmarshallerThread = null;
81     private boolean stopOrderedFlag = false;
82
83     /**
84      * Create the unmarshaler.
85      *
86      * @param name the name of the unmarshaler
87      * @param engineServiceParameters the engine service parameters for this Apex engine
88      * @param consumerParameters the consumer parameters for this specific unmarshaler
89      */
90     public ApexEventUnmarshaller(final String name, final EngineServiceParameters engineServiceParameters,
91                     final EventHandlerParameters consumerParameters) {
92         this.name = name;
93         this.engineServiceParameters = engineServiceParameters;
94         this.consumerParameters = consumerParameters;
95     }
96
97     /**
98      * Configure the consumer and initialize the thread for event sending.
99      *
100      * @param incomingEngineServiceHandler the Apex engine service handler for passing events to Apex
101      * @throws ApexEventException on errors initializing event handling
102      */
103     public void init(final ApexEngineServiceHandler incomingEngineServiceHandler) throws ApexEventException {
104         this.engineServiceHandler = incomingEngineServiceHandler;
105
106         // Create the consumer for sending events and the converter for transforming events
107         consumer = new EventConsumerFactory().createConsumer(name, consumerParameters);
108         consumer.init(this.name, this.consumerParameters, this);
109
110         converter = new EventProtocolFactory().createConverter(name, consumerParameters.getEventProtocolParameters());
111     }
112
113     /**
114      * Start the unmarshaler and consumer threads.
115      */
116     public void start() {
117         // Start the consumer
118         consumer.start();
119
120         // Configure and start the event reception thread
121         final String threadName = engineServiceParameters.getEngineKey().getName() + ":" + this.getClass().getName()
122                         + ":" + name;
123         unmarshallerThread = new ApplicationThreadFactory(threadName).newThread(this);
124         unmarshallerThread.setDaemon(true);
125         unmarshallerThread.start();
126     }
127
128     /**
129      * Gets the name of the unmarshaler.
130      *
131      * @return the unmarshaler name
132      */
133     public String getName() {
134         return name;
135     }
136
137     /**
138      * Gets the technology specific consumer for this unmarshaler.
139      *
140      * @return the consumer
141      */
142     public ApexEventConsumer getConsumer() {
143         return consumer;
144     }
145
146     /**
147      * Gets the event protocol converter for this unmarshaler.
148      *
149      * @return the event protocol converter
150      */
151     public ApexEventProtocolConverter getConverter() {
152         return converter;
153     }
154
155     /**
156      * Connect a synchronous unmarshaler with a synchronous marshaler.
157      * 
158      * @param peeredMode the peered mode under which the unmarshaler and marshaler are connected
159      * @param peeredMarshaller the synchronous marshaler to connect with
160      */
161     public void connectMarshaler(final EventHandlerPeeredMode peeredMode, final ApexEventMarshaller peeredMarshaller) {
162         switch (peeredMode) {
163             case SYNCHRONOUS:
164                 // To connect a synchronous unmarshaler and marshaler, we create a synchronous event
165                 // cache on the consumer/producer pair
166                 new SynchronousEventCache(peeredMode, consumer, peeredMarshaller.getProducer(),
167                                 consumerParameters.getPeerTimeout(EventHandlerPeeredMode.SYNCHRONOUS));
168                 return;
169
170             case REQUESTOR:
171                 new PeeredReference(peeredMode, consumer, peeredMarshaller.getProducer());
172                 return;
173
174             default:
175                 return;
176         }
177     }
178
179     /*
180      * (non-Javadoc)
181      * 
182      * @see org.onap.policy.apex.service.engine.event.ApexEventReceiver#receiveEvent(java.lang.Object)
183      */
184     @Override
185     public void receiveEvent(final Object event) throws ApexEventException {
186         receiveEvent(0, event, true);
187     }
188
189     /*
190      * (non-Javadoc)
191      * 
192      * @see org.onap.policy.apex.service.engine.event.ApexEventReceiver#receiveEvent(long, java.lang.Object)
193      */
194     @Override
195     public void receiveEvent(final long executionId, final Object event) throws ApexEventException {
196         receiveEvent(executionId, event, false);
197     }
198
199     /**
200      * Receive an event from a consumer, convert its protocol and forward it to Apex.
201      *
202      * @param executionId the execution id the incoming execution ID
203      * @param event the event in its native format
204      * @param generateExecutionId if true, let Apex generate the execution ID, if false, use the incoming execution ID
205      * @throws ApexEventException on unmarshaling errors on events
206      */
207     private void receiveEvent(final long executionId, final Object event, final boolean generateExecutionId)
208                     throws ApexEventException {
209         // Push the event onto the queue
210         if (LOGGER.isTraceEnabled()) {
211             String eventString = "onMessage(): event received: " + event.toString();
212             LOGGER.trace(eventString);
213         }
214
215         // Convert the incoming events to Apex events
216         try {
217             final List<ApexEvent> apexEventList = converter.toApexEvent(consumerParameters.getEventName(), event);
218             for (final ApexEvent apexEvent : apexEventList) {
219                 isEventFilteredOut(apexEvent);
220
221                 // Check if this event is filtered out by the incoming filter
222                 if (isEventFilteredOut(apexEvent)) {
223                     // Ignore this event
224                     continue;
225                 }
226
227                 if (!generateExecutionId) {
228                     apexEvent.setExecutionId(executionId);
229                 }
230
231                 // Enqueue the event
232                 queue.add(apexEvent);
233
234                 // Cache synchronized events that are sent
235                 if (consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS)) {
236                     final SynchronousEventCache synchronousEventCache = (SynchronousEventCache) consumer
237                                     .getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS);
238                     synchronousEventCache.cacheSynchronizedEventToApex(apexEvent.getExecutionId(), apexEvent);
239                 }
240             }
241         } catch (final ApexException e) {
242             final String errorMessage = "Error while converting event into an ApexEvent for " + name + ": "
243                             + e.getMessage() + ", Event=" + event;
244             LOGGER.warn(errorMessage, e);
245             throw new ApexEventException(errorMessage, e);
246         }
247     }
248
249     /**
250      * Check if an event is filtered out and ignored.
251      * 
252      * @param apexEvent the event to check
253      */
254     private boolean isEventFilteredOut(final ApexEvent apexEvent) {
255         // Check if we are filtering events on this unmarshaler, if so check the event name
256         // against the filter
257         if (consumerParameters.isSetEventNameFilter()
258                         && !apexEvent.getName().matches(consumerParameters.getEventNameFilter())) {
259             
260             if (LOGGER.isTraceEnabled()) {
261                 LOGGER.trace("onMessage(): event {} not processed, filtered  out by filter", apexEvent,
262                                 consumerParameters.getEventNameFilter());
263             }
264             
265             return true;
266         }
267         else {
268             return false;
269         }
270     }
271
272     /**
273      * Run a thread that runs forever (well until system termination anyway) and listens for incoming events on the
274      * queue.
275      */
276     @Override
277     public void run() {
278         // Run until interruption
279         while (unmarshallerThread.isAlive() && !stopOrderedFlag) {
280             try {
281                 // Take the next event from the queue
282                 final ApexEvent apexEvent = queue.poll(EVENT_QUEUE_POLL_INTERVAL, TimeUnit.MILLISECONDS);
283                 if (apexEvent == null) {
284                     continue;
285                 }
286
287                 if (LOGGER.isTraceEnabled()) {
288                     String message = apexEvent.toString();
289                     LOGGER.trace("event received {}", message);
290                 }
291
292                 // Pass the event to the activator for forwarding to Apex
293                 engineServiceHandler.forwardEvent(apexEvent);
294             } catch (final InterruptedException e) {
295                 // restore the interrupt status
296                 Thread.currentThread().interrupt();
297                 LOGGER.warn("BatchProcessor thread interrupted, Reason {}", e.getMessage());
298                 stopOrderedFlag = true;
299             } catch (final Exception e) {
300                 LOGGER.warn("Error while forwarding events for " + unmarshallerThread.getName(), e);
301             }
302         }
303
304         // Stop event production
305         consumer.stop();
306     }
307
308     /**
309      * Get the unmarshaler thread.
310      *
311      * @return the unmarshaler thread
312      */
313     public Thread getThread() {
314         return unmarshallerThread;
315     }
316
317     /**
318      * Stop the Apex event unmarshaller's event producer using its termination mechanism.
319      */
320     public void stop() {
321         LOGGER.entry("shutting down Apex event unmarshaller . . .");
322
323         // Order the stop
324         stopOrderedFlag = true;
325
326         // Order a stop on the synchronous cache if one exists
327         if (consumerParameters != null && consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS)
328                         && consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS) != null) {
329             ((SynchronousEventCache) consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS)).stop();
330         }
331
332         // Wait for thread shutdown
333         while (unmarshallerThread != null && unmarshallerThread.isAlive()) {
334             ThreadUtilities.sleep(UNMARSHALLER_SHUTDOWN_WAIT_INTERVAL);
335         }
336
337         LOGGER.exit("shut down Apex event unmarshaller");
338     }
339 }