7c0c17d6bb14c7d2ef608cb454ba9cbf889b90a2
[policy/apex-pdp.git] / services / services-engine / src / main / java / org / onap / policy / apex / service / engine / main / ApexEventMarshaller.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.service.engine.main;
22
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.TimeUnit;
26
27 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
28 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
29 import org.onap.policy.apex.service.engine.event.ApexEvent;
30 import org.onap.policy.apex.service.engine.event.ApexEventException;
31 import org.onap.policy.apex.service.engine.event.ApexEventProducer;
32 import org.onap.policy.apex.service.engine.event.ApexEventProtocolConverter;
33 import org.onap.policy.apex.service.engine.event.impl.EventProducerFactory;
34 import org.onap.policy.apex.service.engine.event.impl.EventProtocolFactory;
35 import org.onap.policy.apex.service.engine.runtime.ApexEventListener;
36 import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters;
37 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
38 import org.slf4j.ext.XLogger;
39 import org.slf4j.ext.XLoggerFactory;
40
41 /**
42  * This event marshaler handles events coming out of Apex and sends them on, handles threading, event queuing,
43  * transformations and sending using the configured sending technology.
44  *
45  * @author Liam Fallon (liam.fallon@ericsson.com)
46  */
47 public class ApexEventMarshaller implements ApexEventListener, Runnable {
48     // Get a reference to the logger
49     private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexEventMarshaller.class);
50
51     // Interval to wait between thread shutdown checks
52     private static final int MARSHALLER_SHUTDOWN_WAIT_INTERVAL = 10;
53
54     // The amount of time to wait between polls of the event queue in milliseconds
55     private static final long EVENT_QUEUE_POLL_INTERVAL = 20;
56
57     // The name of the marshaler
58     private final String name;
59
60     // The engine service and producer parameters
61     private final EngineServiceParameters engineServiceParameters;
62     private final EventHandlerParameters producerParameters;
63
64     // Apex event producer and event converter, all conversions are to and from string
65     // representation of events
66     private ApexEventProducer producer;
67     private ApexEventProtocolConverter converter;
68
69     // Temporary event holder for events coming out of Apex
70     private final BlockingQueue<ApexEvent> queue = new LinkedBlockingQueue<>();
71
72     // The marshaler thread and stopping flag
73     private Thread marshallerThread;
74     private boolean stopOrderedFlag = false;
75
76     /**
77      * Create the marshaler.
78      *
79      * @param name the name of the marshaler
80      * @param engineServiceParameters the engine service parameters for this Apex engine
81      * @param producerParameters the producer parameters for this specific marshaler
82      */
83     public ApexEventMarshaller(final String name, final EngineServiceParameters engineServiceParameters,
84             final EventHandlerParameters producerParameters) {
85         this.name = name;
86         this.engineServiceParameters = engineServiceParameters;
87         this.producerParameters = producerParameters;
88     }
89
90     /**
91      * Configure the marshaler by setting up the producer and event converter and initialize the thread for event
92      * sending.
93      *
94      * @throws ApexEventException on errors initializing event handling
95      */
96     public void init() throws ApexEventException {
97         // Create the producer for sending events and the converter for transforming events
98         producer = new EventProducerFactory().createProducer(name, producerParameters);
99
100         // Initialize the producer
101         producer.init(this.name, this.producerParameters);
102
103         // Create the converter for transforming events
104         converter = new EventProtocolFactory().createConverter(name, producerParameters.getEventProtocolParameters());
105
106         // Configure and start the event sending thread
107         final String threadName =
108                 engineServiceParameters.getEngineKey().getName() + ':' + this.getClass().getName() + ':' + this.name;
109         marshallerThread = new ApplicationThreadFactory(threadName).newThread(this);
110         marshallerThread.setDaemon(true);
111         marshallerThread.start();
112     }
113
114     /**
115      * Gets the name of the marshaler.
116      *
117      * @return the marshaler name
118      */
119     public String getName() {
120         return name;
121     }
122
123     /**
124      * Gets the technology specific producer for this marshaler.
125      *
126      * @return the producer
127      */
128     public ApexEventProducer getProducer() {
129         return producer;
130     }
131
132     /**
133      * Gets the event protocol converter for this marshaler.
134      *
135      * @return the event protocol converter
136      */
137     public ApexEventProtocolConverter getConverter() {
138         return converter;
139     }
140
141     /**
142      * Callback method called on implementations of this interface when Apex emits an event.
143      *
144      * @param apexEvent the apex event emitted by Apex
145      */
146     @Override
147     public void onApexEvent(final ApexEvent apexEvent) {
148         // Check if we are filtering events on this marshaler, if so check the event name against
149         // the filter
150         if (producerParameters.isSetEventNameFilter()
151                 && !apexEvent.getName().matches(producerParameters.getEventNameFilter())) {
152             if (LOGGER.isTraceEnabled()) {
153                 LOGGER.trace("onMessage(): event {} not processed, filtered  out by filter", apexEvent,
154                         producerParameters.getEventNameFilter());
155             }
156
157             // Ignore this event
158             return;
159         }
160
161         // Push the event onto the queue for handling
162         try {
163             queue.put(apexEvent);
164         } catch (final InterruptedException e) {
165             // restore the interrupt status
166             Thread.currentThread().interrupt();
167             LOGGER.warn("Queueing of the event was interrupted: " + apexEvent, e);
168         }
169     }
170
171     /**
172      * Run a thread that runs forever (well until system termination anyway) and listens for outgoing events on the
173      * queue.
174      */
175     @Override
176     public void run() {
177         // Run until interrupted
178         while (marshallerThread.isAlive() && !stopOrderedFlag) {
179             try {
180                 // Take the next event from the queue
181                 final ApexEvent apexEvent = queue.poll(EVENT_QUEUE_POLL_INTERVAL, TimeUnit.MILLISECONDS);
182                 if (apexEvent == null) {
183                     continue;
184                 }
185
186                 // Process the next Apex event from the queue
187                 final Object event = converter.fromApexEvent(apexEvent);
188
189                 producer.sendEvent(apexEvent.getExecutionId(), apexEvent.getExecutionProperties(), apexEvent.getName(),
190                         event);
191
192                 if (LOGGER.isTraceEnabled()) {
193                     final String message = "event sent : " + apexEvent.toString();
194                     LOGGER.trace(message);
195                 }
196             } catch (final InterruptedException e) {
197                 // restore the interrupt status
198                 Thread.currentThread().interrupt();
199                 LOGGER.debug("Thread interrupted, Reason {}", e.getMessage());
200                 stopOrderedFlag = true;
201             } catch (final Exception e) {
202                 LOGGER.warn("Error while forwarding events for " + marshallerThread.getName(), e);
203             }
204         }
205
206         // Stop event production if we are not synchronized,;in the synchronized case, the producer
207         // takes care of its own cleanup.
208         producer.stop();
209     }
210
211     /**
212      * Get the marshaler thread.
213      *
214      * @return the marshaler thread
215      */
216     public Thread getThread() {
217         return marshallerThread;
218     }
219
220     /**
221      * Stop the Apex event marshaller's event producer using its termination mechanism.
222      */
223     public void stop() {
224         LOGGER.entry("shutting down Apex event marshaller . . .");
225
226         // Order the stop
227         stopOrderedFlag = true;
228
229         // Wait for thread shutdown
230         while (marshallerThread.isAlive()) {
231             ThreadUtilities.sleep(MARSHALLER_SHUTDOWN_WAIT_INTERVAL);
232         }
233
234         LOGGER.exit("shut down Apex event marshaller");
235     }
236 }