2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.apex.service.engine.main;
23 import java.util.List;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.LinkedBlockingQueue;
26 import java.util.concurrent.TimeUnit;
28 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
29 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
30 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
31 import org.onap.policy.apex.service.engine.event.ApexEvent;
32 import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
33 import org.onap.policy.apex.service.engine.event.ApexEventException;
34 import org.onap.policy.apex.service.engine.event.ApexEventProtocolConverter;
35 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
36 import org.onap.policy.apex.service.engine.event.PeeredReference;
37 import org.onap.policy.apex.service.engine.event.SynchronousEventCache;
38 import org.onap.policy.apex.service.engine.event.impl.EventConsumerFactory;
39 import org.onap.policy.apex.service.engine.event.impl.EventProtocolFactory;
40 import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters;
41 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
42 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
43 import org.slf4j.ext.XLogger;
44 import org.slf4j.ext.XLoggerFactory;
47 * This event unmarshaler handles events coming into Apex, handles threading, event queuing,
48 * transformation and receiving using the configured receiving technology.
50 * @author Liam Fallon (liam.fallon@ericsson.com)
52 public class ApexEventUnmarshaller implements ApexEventReceiver, Runnable {
53 // Get a reference to the logger
54 private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexEventUnmarshaller.class);
56 // Interval to wait between thread shutdown checks
57 private static final int UNMARSHALLER_SHUTDOWN_WAIT_INTERVAL = 10;
59 // The amount of time to wait between polls of the event queue in milliseconds
60 private static final long EVENT_QUEUE_POLL_INTERVAL = 20;
62 // The name of the unmarshaler
63 private final String name;
65 // The engine service and consumer parameters
66 private final EngineServiceParameters engineServiceParameters;
67 private final EventHandlerParameters consumerParameters;
69 // The engine service handler to use for forwarding on of unmarshalled events
70 private ApexEngineServiceHandler engineServiceHandler;
72 // Apex event producer and event converter, all events are sent as string representations
73 private ApexEventConsumer consumer;
74 private ApexEventProtocolConverter converter;
76 // Temporary event holder for events going into Apex
77 private final BlockingQueue<ApexEvent> queue = new LinkedBlockingQueue<>();
79 // The unmarshaler thread and stopping flag
80 private Thread unmarshallerThread = null;
81 private boolean stopOrderedFlag = false;
84 * Create the unmarshaler.
86 * @param name the name of the unmarshaler
87 * @param engineServiceParameters the engine service parameters for this Apex engine
88 * @param consumerParameters the consumer parameters for this specific unmarshaler
90 public ApexEventUnmarshaller(final String name, final EngineServiceParameters engineServiceParameters,
91 final EventHandlerParameters consumerParameters) {
93 this.engineServiceParameters = engineServiceParameters;
94 this.consumerParameters = consumerParameters;
98 * Configure the consumer and initialize the thread for event sending.
100 * @param incomingEngineServiceHandler the Apex engine service handler for passing events to
102 * @throws ApexEventException on errors initializing event handling
104 public void init(final ApexEngineServiceHandler incomingEngineServiceHandler) throws ApexEventException {
105 this.engineServiceHandler = incomingEngineServiceHandler;
107 // Create the consumer for sending events and the converter for transforming events
108 consumer = new EventConsumerFactory().createConsumer(name, consumerParameters);
109 consumer.init(this.name, this.consumerParameters, this);
111 converter = new EventProtocolFactory().createConverter(name, consumerParameters.getEventProtocolParameters());
115 * Start the unmarshaler and consumer threads.
117 public void start() {
118 // Start the consumer
121 // Configure and start the event reception thread
122 final String threadName =
123 engineServiceParameters.getEngineKey().getName() + ":" + this.getClass().getName() + ":" + name;
124 unmarshallerThread = new ApplicationThreadFactory(threadName).newThread(this);
125 unmarshallerThread.setDaemon(true);
126 unmarshallerThread.start();
130 * Gets the name of the unmarshaler.
132 * @return the unmarshaler name
134 public String getName() {
139 * Gets the technology specific consumer for this unmarshaler.
141 * @return the consumer
143 public ApexEventConsumer getConsumer() {
148 * Gets the event protocol converter for this unmarshaler.
150 * @return the event protocol converter
152 public ApexEventProtocolConverter getConverter() {
157 * Connect a synchronous unmarshaler with a synchronous marshaler.
159 * @param peeredMode the peered mode under which the unmarshaler and marshaler are connected
160 * @param peeredMarshaller the synchronous marshaler to connect with
162 public void connectMarshaler(final EventHandlerPeeredMode peeredMode, final ApexEventMarshaller peeredMarshaller) {
163 switch (peeredMode) {
165 // To connect a synchronous unmarshaler and marshaler, we create a synchronous event
166 // cache on the consumer/producer pair
167 new SynchronousEventCache(peeredMode, consumer, peeredMarshaller.getProducer(),
168 consumerParameters.getPeerTimeout(EventHandlerPeeredMode.SYNCHRONOUS));
172 new PeeredReference(peeredMode, consumer, peeredMarshaller.getProducer());
184 * org.onap.policy.apex.service.engine.event.ApexEventReceiver#receiveEvent(java.lang.Object)
187 public void receiveEvent(final Object event) throws ApexEventException {
188 receiveEvent(0, event, true);
194 * @see org.onap.policy.apex.service.engine.event.ApexEventReceiver#receiveEvent(long,
198 public void receiveEvent(final long executionId, final Object event) throws ApexEventException {
199 receiveEvent(executionId, event, false);
203 * Receive an event from a consumer, convert its protocol and forward it to Apex.
205 * @param executionId the execution id the incoming execution ID
206 * @param event the event in its native format
207 * @param generateExecutionId if true, let Apex generate the execution ID, if false, use the
208 * incoming execution ID
209 * @throws ApexEventException on unmarshaling errors on events
211 private void receiveEvent(final long executionId, final Object event, final boolean generateExecutionId)
212 throws ApexEventException {
213 // Push the event onto the queue
214 if (LOGGER.isTraceEnabled()) {
215 LOGGER.trace("onMessage(): event received: {}", event.toString());
218 // Convert the incoming events to Apex events
220 final List<ApexEvent> apexEventList = converter.toApexEvent(consumerParameters.getEventName(), event);
221 for (final ApexEvent apexEvent : apexEventList) {
222 // Check if we are filtering events on this unmarshaler, if so check the event name
223 // against the filter
224 if (consumerParameters.isSetEventNameFilter()
225 && !apexEvent.getName().matches(consumerParameters.getEventNameFilter())) {
226 if (LOGGER.isTraceEnabled()) {
227 LOGGER.trace("onMessage(): event {} not processed, filtered out by filter", apexEvent,
228 consumerParameters.getEventNameFilter());
235 if (!generateExecutionId) {
236 apexEvent.setExecutionID(executionId);
240 queue.add(apexEvent);
242 // Cache synchronized events that are sent
243 if (consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS)) {
244 final SynchronousEventCache synchronousEventCache =
245 (SynchronousEventCache) consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS);
246 synchronousEventCache.cacheSynchronizedEventToApex(apexEvent.getExecutionID(), apexEvent);
249 } catch (final ApexException e) {
250 final String errorMessage = "Error while converting event into an ApexEvent for " + name + ": "
251 + e.getMessage() + ", Event=" + event;
252 LOGGER.warn(errorMessage, e);
253 throw new ApexEventException(errorMessage, e);
258 * Run a thread that runs forever (well until system termination anyway) and listens for
259 * incoming events on the queue.
263 // Run until interruption
264 while (unmarshallerThread.isAlive() && !stopOrderedFlag) {
266 // Take the next event from the queue
267 final ApexEvent apexEvent = queue.poll(EVENT_QUEUE_POLL_INTERVAL, TimeUnit.MILLISECONDS);
268 if (apexEvent == null) {
272 if (LOGGER.isTraceEnabled()) {
273 LOGGER.trace("event received {}", apexEvent.toString());
276 // Pass the event to the activator for forwarding to Apex
277 engineServiceHandler.forwardEvent(apexEvent);
278 } catch (final InterruptedException e) {
279 // restore the interrupt status
280 Thread.currentThread().interrupt();
281 LOGGER.warn("BatchProcessor thread interrupted, Reason {}", e.getMessage());
283 } catch (final Exception e) {
284 LOGGER.warn("Error while forwarding events for " + unmarshallerThread.getName(), e);
289 // Stop event production
294 * Get the unmarshaler thread.
296 * @return the unmarshaler thread
298 public Thread getThread() {
299 return unmarshallerThread;
303 * Stop the Apex event unmarshaller's event producer using its termination mechanism.
306 LOGGER.entry("shutting down Apex event unmarshaller . . .");
309 stopOrderedFlag = true;
311 // Order a stop on the synchronous cache if one exists
312 if (consumerParameters != null && consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS)) {
313 if (consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS) != null) {
314 ((SynchronousEventCache) consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS)).stop();
318 // Wait for thread shutdown
319 while (unmarshallerThread != null && unmarshallerThread.isAlive()) {
320 ThreadUtilities.sleep(UNMARSHALLER_SHUTDOWN_WAIT_INTERVAL);
323 LOGGER.exit("shut down Apex event unmarshaller");