2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.apex.service.engine.main;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.TimeUnit;
27 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
28 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
29 import org.onap.policy.apex.service.engine.event.ApexEvent;
30 import org.onap.policy.apex.service.engine.event.ApexEventException;
31 import org.onap.policy.apex.service.engine.event.ApexEventProducer;
32 import org.onap.policy.apex.service.engine.event.ApexEventProtocolConverter;
33 import org.onap.policy.apex.service.engine.event.impl.EventProducerFactory;
34 import org.onap.policy.apex.service.engine.event.impl.EventProtocolFactory;
35 import org.onap.policy.apex.service.engine.runtime.ApexEventListener;
36 import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters;
37 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
38 import org.slf4j.ext.XLogger;
39 import org.slf4j.ext.XLoggerFactory;
42 * This event marshaler handles events coming out of Apex and sends them on, handles threading,
43 * event queuing, transformations and sending using the configured sending technology.
45 * @author Liam Fallon (liam.fallon@ericsson.com)
47 public class ApexEventMarshaller implements ApexEventListener, Runnable {
48 // Get a reference to the logger
49 private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexEventMarshaller.class);
51 // Interval to wait between thread shutdown checks
52 private static final int MARSHALLER_SHUTDOWN_WAIT_INTERVAL = 10;
54 // The amount of time to wait between polls of the event queue in milliseconds
55 private static final long EVENT_QUEUE_POLL_INTERVAL = 20;
57 // The name of the marshaler
58 private final String name;
60 // The engine service and producer parameters
61 private final EngineServiceParameters engineServiceParameters;
62 private final EventHandlerParameters producerParameters;
64 // Apex event producer and event converter, all conversions are to and from string
65 // representation of events
66 private ApexEventProducer producer;
67 private ApexEventProtocolConverter converter;
69 // Temporary event holder for events coming out of Apex
70 private final BlockingQueue<ApexEvent> queue = new LinkedBlockingQueue<>();
72 // The marshaler thread and stopping flag
73 private Thread marshallerThread;
74 private boolean stopOrderedFlag = false;
77 * Create the marshaler.
79 * @param name the name of the marshaler
80 * @param engineServiceParameters the engine service parameters for this Apex engine
81 * @param producerParameters the producer parameters for this specific marshaler
83 public ApexEventMarshaller(final String name, final EngineServiceParameters engineServiceParameters,
84 final EventHandlerParameters producerParameters) {
86 this.engineServiceParameters = engineServiceParameters;
87 this.producerParameters = producerParameters;
91 * Configure the marshaler by setting up the producer and event converter and initialize the
92 * thread for event sending.
94 * @throws ApexActivatorException on errors initializing the producer
95 * @throws ApexEventException on errors initializing event handling
97 public void init() throws ApexActivatorException, ApexEventException {
98 // Create the producer for sending events and the converter for transforming events
99 producer = new EventProducerFactory().createProducer(name, producerParameters);
101 // Initialize the producer
102 producer.init(this.name, this.producerParameters);
104 // Create the converter for transforming events
105 converter = new EventProtocolFactory().createConverter(name, producerParameters.getEventProtocolParameters());
107 // Configure and start the event sending thread
108 final String threadName =
109 engineServiceParameters.getEngineKey().getName() + ':' + this.getClass().getName() + ':' + this.name;
110 marshallerThread = new ApplicationThreadFactory(threadName).newThread(this);
111 marshallerThread.setDaemon(true);
112 marshallerThread.start();
116 * Gets the name of the marshaler.
118 * @return the marshaler name
120 public String getName() {
125 * Gets the technology specific producer for this marshaler.
127 * @return the producer
129 public ApexEventProducer getProducer() {
134 * Gets the event protocol converter for this marshaler.
136 * @return the event protocol converter
138 public ApexEventProtocolConverter getConverter() {
143 * Callback method called on implementations of this interface when Apex emits an event.
145 * @param apexEvent the apex event emitted by Apex
148 public void onApexEvent(final ApexEvent apexEvent) {
149 // Check if we are filtering events on this marshaler, if so check the event name against
151 if (producerParameters.isSetEventNameFilter()
152 && !apexEvent.getName().matches(producerParameters.getEventNameFilter())) {
153 if (LOGGER.isTraceEnabled()) {
154 LOGGER.trace("onMessage(): event {} not processed, filtered out by filter", apexEvent,
155 producerParameters.getEventNameFilter());
162 // Push the event onto the queue for handling
164 queue.put(apexEvent);
165 } catch (final InterruptedException e) {
166 // restore the interrupt status
167 Thread.currentThread().interrupt();
168 LOGGER.warn("Failed to queue the event: " + apexEvent, e);
173 * Run a thread that runs forever (well until system termination anyway) and listens for
174 * outgoing events on the queue.
178 // Run until interrupted
179 while (marshallerThread.isAlive() && !stopOrderedFlag) {
181 // Take the next event from the queue
182 final ApexEvent apexEvent = queue.poll(EVENT_QUEUE_POLL_INTERVAL, TimeUnit.MILLISECONDS);
183 if (apexEvent == null) {
187 // Process the next Apex event from the queue
188 final Object event = converter.fromApexEvent(apexEvent);
190 producer.sendEvent(apexEvent.getExecutionID(), apexEvent.getName(), event);
192 if (LOGGER.isTraceEnabled()) {
193 LOGGER.trace("event sent : " + apexEvent.toString());
195 } catch (final InterruptedException e) {
196 // restore the interrupt status
197 Thread.currentThread().interrupt();
198 LOGGER.debug("Thread interrupted, Reason {}", e.getMessage());
200 } catch (final Exception e) {
201 LOGGER.warn("Error while forwarding events for " + marshallerThread.getName(), e);
206 // Stop event production if we are not synchronized,;in the synchronized case, the producer
207 // takes care of its own cleanup.
212 * Get the marshaler thread.
214 * @return the marshaler thread
216 public Thread getThread() {
217 return marshallerThread;
221 * Stop the Apex event marshaller's event producer using its termination mechanism.
224 LOGGER.entry("shutting down Apex event marshaller . . .");
227 stopOrderedFlag = true;
229 // Wait for thread shutdown
230 while (marshallerThread.isAlive()) {
231 ThreadUtilities.sleep(MARSHALLER_SHUTDOWN_WAIT_INTERVAL);
234 LOGGER.exit("shut down Apex event marshaller");