2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * Modifications Copyright (C) 2019-2020 Nordix Foundation.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * SPDX-License-Identifier: Apache-2.0
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.apex.service.engine.main;
24 import java.util.List;
25 import java.util.Properties;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.TimeUnit;
29 import lombok.NonNull;
30 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
31 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
32 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
33 import org.onap.policy.apex.service.engine.event.ApexEvent;
34 import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
35 import org.onap.policy.apex.service.engine.event.ApexEventException;
36 import org.onap.policy.apex.service.engine.event.ApexEventProtocolConverter;
37 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
38 import org.onap.policy.apex.service.engine.event.PeeredReference;
39 import org.onap.policy.apex.service.engine.event.SynchronousEventCache;
40 import org.onap.policy.apex.service.engine.event.impl.EventConsumerFactory;
41 import org.onap.policy.apex.service.engine.event.impl.EventProtocolFactory;
42 import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters;
43 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
44 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
45 import org.slf4j.ext.XLogger;
46 import org.slf4j.ext.XLoggerFactory;
49 * This event unmarshaler handles events coming into Apex, handles threading, event queuing, transformation and
50 * receiving using the configured receiving technology.
52 * @author Liam Fallon (liam.fallon@ericsson.com)
54 public class ApexEventUnmarshaller implements ApexEventReceiver, Runnable {
55 // Get a reference to the logger
56 private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexEventUnmarshaller.class);
58 // Interval to wait between thread shutdown checks
59 private static final int UNMARSHALLER_SHUTDOWN_WAIT_INTERVAL = 10;
61 // The amount of time to wait between polls of the event queue in milliseconds
62 private static final long EVENT_QUEUE_POLL_INTERVAL = 20;
64 // The name of the unmarshaler
65 private final String name;
67 // The engine service and consumer parameters
68 private final EngineServiceParameters engineServiceParameters;
69 private final EventHandlerParameters consumerParameters;
71 // The engine service handler to use for forwarding on of unmarshalled events
72 private ApexEngineServiceHandler engineServiceHandler;
74 // Apex event producer and event converter, all events are sent as string representations
75 private ApexEventConsumer consumer;
76 private ApexEventProtocolConverter converter;
78 // Temporary event holder for events going into Apex
79 private final BlockingQueue<ApexEvent> queue = new LinkedBlockingQueue<>();
81 // The unmarshaler thread and stopping flag
82 private Thread unmarshallerThread = null;
83 private boolean stopOrderedFlag = false;
86 * Create the unmarshaler.
88 * @param name the name of the unmarshaler
89 * @param engineServiceParameters the engine service parameters for this Apex engine
90 * @param consumerParameters the consumer parameters for this specific unmarshaler
92 public ApexEventUnmarshaller(final String name, final EngineServiceParameters engineServiceParameters,
93 final EventHandlerParameters consumerParameters) {
95 this.engineServiceParameters = engineServiceParameters;
96 this.consumerParameters = consumerParameters;
100 * Configure the consumer and initialize the thread for event sending.
102 * @param incomingEngineServiceHandler the Apex engine service handler for passing events to Apex
103 * @throws ApexEventException on errors initializing event handling
105 public void init(final ApexEngineServiceHandler incomingEngineServiceHandler) throws ApexEventException {
106 this.engineServiceHandler = incomingEngineServiceHandler;
108 // Create the consumer for sending events and the converter for transforming events
109 consumer = new EventConsumerFactory().createConsumer(name, consumerParameters);
110 consumer.init(this.name, this.consumerParameters, this);
112 converter = new EventProtocolFactory().createConverter(name, consumerParameters.getEventProtocolParameters());
116 * Start the unmarshaler and consumer threads.
118 public void start() {
119 // Start the consumer
122 // Configure and start the event reception thread
123 final String threadName =
124 engineServiceParameters.getEngineKey().getName() + ":" + this.getClass().getName() + ":" + name;
125 unmarshallerThread = new ApplicationThreadFactory(threadName).newThread(this);
126 unmarshallerThread.setDaemon(true);
127 unmarshallerThread.start();
131 * Gets the name of the unmarshaler.
133 * @return the unmarshaler name
135 public String getName() {
140 * Gets the technology specific consumer for this unmarshaler.
142 * @return the consumer
144 public ApexEventConsumer getConsumer() {
149 * Gets the event protocol converter for this unmarshaler.
151 * @return the event protocol converter
153 public ApexEventProtocolConverter getConverter() {
158 * Connect a synchronous unmarshaler with a synchronous marshaler.
160 * @param peeredMode the peered mode under which the unmarshaler and marshaler are connected
161 * @param peeredMarshaller the synchronous marshaler to connect with
163 public void connectMarshaler(final EventHandlerPeeredMode peeredMode, final ApexEventMarshaller peeredMarshaller) {
164 switch (peeredMode) {
166 // To connect a synchronous unmarshaler and marshaler, we create a synchronous event
167 // cache on the consumer/producer pair
168 new SynchronousEventCache(peeredMode, consumer, peeredMarshaller.getProducer(),
169 consumerParameters.getPeerTimeout(EventHandlerPeeredMode.SYNCHRONOUS));
173 new PeeredReference(peeredMode, consumer, peeredMarshaller.getProducer());
185 public void receiveEvent(@NonNull final Properties executionProperties, final Object event)
186 throws ApexEventException {
187 receiveEvent(0, executionProperties, event, true);
194 public void receiveEvent(final long executionId, @NonNull final Properties executionProperties, final Object event)
195 throws ApexEventException {
196 receiveEvent(executionId, executionProperties, event, false);
200 * Receive an event from a consumer, convert its protocol and forward it to Apex.
202 * @param executionId the execution id the incoming execution ID
203 * @param executionProperties properties used during processing of this event
204 * @param event the event in its native format
205 * @param generateExecutionId if true, let Apex generate the execution ID, if false, use the incoming execution ID
206 * @throws ApexEventException on unmarshaling errors on events
208 private void receiveEvent(final long executionId, final Properties executionProperties, final Object event,
209 final boolean generateExecutionId) throws ApexEventException {
210 // Push the event onto the queue
211 if (LOGGER.isTraceEnabled()) {
212 String eventString = "onMessage(): event received: " + event.toString();
213 LOGGER.trace(eventString);
216 // Convert the incoming events to Apex events
218 final List<ApexEvent> apexEventList = converter.toApexEvent(consumerParameters.getEventName(), event);
220 for (final ApexEvent apexEvent : apexEventList) {
221 isEventFilteredOut(apexEvent);
223 // Check if this event is filtered out by the incoming filter
224 if (isEventFilteredOut(apexEvent)) {
229 if (!generateExecutionId) {
230 apexEvent.setExecutionId(executionId);
233 apexEvent.setExecutionProperties(executionProperties);
235 // Cache synchronized events that are sent
236 if (consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS)) {
237 final SynchronousEventCache synchronousEventCache =
238 (SynchronousEventCache) consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS);
239 synchronousEventCache.cacheSynchronizedEventToApex(apexEvent.getExecutionId(), apexEvent);
243 queue.add(apexEvent);
245 } catch (final ApexException e) {
246 final String errorMessage = "Error while converting event into an ApexEvent for " + name + ": "
247 + e.getMessage() + ", Event=" + event;
248 LOGGER.warn(errorMessage, e);
249 throw new ApexEventException(errorMessage, e);
254 * Check if an event is filtered out and ignored.
256 * @param apexEvent the event to check
258 private boolean isEventFilteredOut(final ApexEvent apexEvent) {
259 // Check if we are filtering events on this unmarshaler, if so check the event name
260 // against the filter
261 if (consumerParameters.isSetEventNameFilter()
262 && !apexEvent.getName().matches(consumerParameters.getEventNameFilter())) {
264 if (LOGGER.isTraceEnabled()) {
265 LOGGER.trace("onMessage(): event {} not processed, filtered out by filter", apexEvent,
266 consumerParameters.getEventNameFilter());
276 * Run a thread that runs forever (well until system termination anyway) and listens for incoming events on the
281 // Run until interruption
282 while (unmarshallerThread.isAlive() && !stopOrderedFlag) {
284 // Take the next event from the queue
285 final ApexEvent apexEvent = queue.poll(EVENT_QUEUE_POLL_INTERVAL, TimeUnit.MILLISECONDS);
286 if (apexEvent == null) {
290 if (LOGGER.isTraceEnabled()) {
291 String message = apexEvent.toString();
292 LOGGER.trace("event received {}", message);
295 // Pass the event to the activator for forwarding to Apex
296 engineServiceHandler.forwardEvent(apexEvent);
297 } catch (final InterruptedException e) {
298 // restore the interrupt status
299 Thread.currentThread().interrupt();
300 LOGGER.warn("BatchProcessor thread interrupted, Reason {}", e.getMessage());
301 stopOrderedFlag = true;
302 } catch (final Exception e) {
303 LOGGER.warn("Error while forwarding events for " + unmarshallerThread.getName(), e);
307 // Stop event production
312 * Get the unmarshaler thread.
314 * @return the unmarshaler thread
316 public Thread getThread() {
317 return unmarshallerThread;
321 * Stop the Apex event unmarshaller's event producer using its termination mechanism.
324 LOGGER.entry("shutting down Apex event unmarshaller . . .");
327 stopOrderedFlag = true;
329 // Order a stop on the synchronous cache if one exists
330 if (consumerParameters != null && consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS)
331 && consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS) != null) {
332 ((SynchronousEventCache) consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS)).stop();
335 // Wait for thread shutdown
336 while (unmarshallerThread != null && unmarshallerThread.isAlive()) {
337 ThreadUtilities.sleep(UNMARSHALLER_SHUTDOWN_WAIT_INTERVAL);
340 LOGGER.exit("shut down Apex event unmarshaller");