2c1c900c8356844d3c42c90f5b0989a5ff10dc8d
[policy/apex-pdp.git] / services / services-engine / src / main / java / org / onap / policy / apex / service / engine / main / ApexEventUnmarshaller.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2019-2020 Nordix Foundation.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.apex.service.engine.main;
23
24 import java.util.List;
25 import java.util.Properties;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.TimeUnit;
29
30 import lombok.NonNull;
31
32 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
33 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
34 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
35 import org.onap.policy.apex.service.engine.event.ApexEvent;
36 import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
37 import org.onap.policy.apex.service.engine.event.ApexEventException;
38 import org.onap.policy.apex.service.engine.event.ApexEventProtocolConverter;
39 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
40 import org.onap.policy.apex.service.engine.event.PeeredReference;
41 import org.onap.policy.apex.service.engine.event.SynchronousEventCache;
42 import org.onap.policy.apex.service.engine.event.impl.EventConsumerFactory;
43 import org.onap.policy.apex.service.engine.event.impl.EventProtocolFactory;
44 import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters;
45 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
46 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
47 import org.slf4j.ext.XLogger;
48 import org.slf4j.ext.XLoggerFactory;
49
50 /**
51  * This event unmarshaler handles events coming into Apex, handles threading, event queuing, transformation and
52  * receiving using the configured receiving technology.
53  *
54  * @author Liam Fallon (liam.fallon@ericsson.com)
55  */
56 public class ApexEventUnmarshaller implements ApexEventReceiver, Runnable {
57     // Get a reference to the logger
58     private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexEventUnmarshaller.class);
59
60     // Interval to wait between thread shutdown checks
61     private static final int UNMARSHALLER_SHUTDOWN_WAIT_INTERVAL = 10;
62
63     // The amount of time to wait between polls of the event queue in milliseconds
64     private static final long EVENT_QUEUE_POLL_INTERVAL = 20;
65
66     // The name of the unmarshaler
67     private final String name;
68
69     // The engine service and consumer parameters
70     private final EngineServiceParameters engineServiceParameters;
71     private final EventHandlerParameters consumerParameters;
72
73     // The engine service handler to use for forwarding on of unmarshalled events
74     private ApexEngineServiceHandler engineServiceHandler;
75
76     // Apex event producer and event converter, all events are sent as string representations
77     private ApexEventConsumer consumer;
78     private ApexEventProtocolConverter converter;
79
80     // Temporary event holder for events going into Apex
81     private final BlockingQueue<ApexEvent> queue = new LinkedBlockingQueue<>();
82
83     // The unmarshaler thread and stopping flag
84     private Thread unmarshallerThread = null;
85     private boolean stopOrderedFlag = false;
86
87     /**
88      * Create the unmarshaler.
89      *
90      * @param name the name of the unmarshaler
91      * @param engineServiceParameters the engine service parameters for this Apex engine
92      * @param consumerParameters the consumer parameters for this specific unmarshaler
93      */
94     public ApexEventUnmarshaller(final String name, final EngineServiceParameters engineServiceParameters,
95         final EventHandlerParameters consumerParameters) {
96         this.name = name;
97         this.engineServiceParameters = engineServiceParameters;
98         this.consumerParameters = consumerParameters;
99     }
100
101     /**
102      * Configure the consumer and initialize the thread for event sending.
103      *
104      * @param incomingEngineServiceHandler the Apex engine service handler for passing events to Apex
105      * @throws ApexEventException on errors initializing event handling
106      */
107     public void init(final ApexEngineServiceHandler incomingEngineServiceHandler) throws ApexEventException {
108         this.engineServiceHandler = incomingEngineServiceHandler;
109
110         // Create the consumer for sending events and the converter for transforming events
111         consumer = new EventConsumerFactory().createConsumer(name, consumerParameters);
112         consumer.init(this.name, this.consumerParameters, this);
113
114         converter = new EventProtocolFactory().createConverter(name, consumerParameters.getEventProtocolParameters());
115     }
116
117     /**
118      * Start the unmarshaler and consumer threads.
119      */
120     public void start() {
121         // Start the consumer
122         consumer.start();
123
124         // Configure and start the event reception thread
125         final String threadName =
126             engineServiceParameters.getEngineKey().getName() + ":" + this.getClass().getName() + ":" + name;
127         unmarshallerThread = new ApplicationThreadFactory(threadName).newThread(this);
128         unmarshallerThread.setDaemon(true);
129         unmarshallerThread.start();
130     }
131
132     /**
133      * Gets the name of the unmarshaler.
134      *
135      * @return the unmarshaler name
136      */
137     public String getName() {
138         return name;
139     }
140
141     /**
142      * Gets the technology specific consumer for this unmarshaler.
143      *
144      * @return the consumer
145      */
146     public ApexEventConsumer getConsumer() {
147         return consumer;
148     }
149
150     /**
151      * Gets the event protocol converter for this unmarshaler.
152      *
153      * @return the event protocol converter
154      */
155     public ApexEventProtocolConverter getConverter() {
156         return converter;
157     }
158
159     /**
160      * Connect a synchronous unmarshaler with a synchronous marshaler.
161      *
162      * @param peeredMode the peered mode under which the unmarshaler and marshaler are connected
163      * @param peeredMarshaller the synchronous marshaler to connect with
164      */
165     public void connectMarshaler(final EventHandlerPeeredMode peeredMode, final ApexEventMarshaller peeredMarshaller) {
166         switch (peeredMode) {
167             case SYNCHRONOUS:
168                 // To connect a synchronous unmarshaler and marshaler, we create a synchronous event
169                 // cache on the consumer/producer pair
170                 new SynchronousEventCache(peeredMode, consumer, peeredMarshaller.getProducer(),
171                     consumerParameters.getPeerTimeout(EventHandlerPeeredMode.SYNCHRONOUS));
172                 return;
173
174             case REQUESTOR:
175                 new PeeredReference(peeredMode, consumer, peeredMarshaller.getProducer());
176                 return;
177
178             default:
179                 return;
180         }
181     }
182
183     /**
184      * {@inheritDoc}.
185      */
186     @Override
187     public void receiveEvent(@NonNull final Properties executionProperties, final Object event)
188         throws ApexEventException {
189         receiveEvent(0, executionProperties, event, true);
190     }
191
192     /**
193      * {@inheritDoc}.
194      */
195     @Override
196     public void receiveEvent(final long executionId, @NonNull final Properties executionProperties, final Object event)
197         throws ApexEventException {
198         receiveEvent(executionId, executionProperties, event, false);
199     }
200
201     /**
202      * Receive an event from a consumer, convert its protocol and forward it to Apex.
203      *
204      * @param executionId the execution id the incoming execution ID
205      * @param executionProperties properties used during processing of this event
206      * @param event the event in its native format
207      * @param generateExecutionId if true, let Apex generate the execution ID, if false, use the incoming execution ID
208      * @throws ApexEventException on unmarshaling errors on events
209      */
210     private void receiveEvent(final long executionId, final Properties executionProperties, final Object event,
211         final boolean generateExecutionId) throws ApexEventException {
212         // Push the event onto the queue
213         if (LOGGER.isTraceEnabled()) {
214             String eventString = "onMessage(): event received: " + event.toString();
215             LOGGER.trace(eventString);
216         }
217
218         // Convert the incoming events to Apex events
219         try {
220             final List<ApexEvent> apexEventList = converter.toApexEvent(consumerParameters.getEventName(), event);
221
222             for (final ApexEvent apexEvent : apexEventList) {
223                 isEventFilteredOut(apexEvent);
224
225                 // Check if this event is filtered out by the incoming filter
226                 if (isEventFilteredOut(apexEvent)) {
227                     // Ignore this event
228                     continue;
229                 }
230
231                 if (!generateExecutionId) {
232                     apexEvent.setExecutionId(executionId);
233                 }
234
235                 apexEvent.setExecutionProperties(executionProperties);
236
237                 // Cache synchronized events that are sent
238                 if (consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS)) {
239                     final SynchronousEventCache synchronousEventCache =
240                         (SynchronousEventCache) consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS);
241                     synchronousEventCache.cacheSynchronizedEventToApex(apexEvent.getExecutionId(), apexEvent);
242                 }
243
244                 // Enqueue the event
245                 queue.add(apexEvent);
246             }
247         } catch (final ApexException e) {
248             final String errorMessage = "Error while converting event into an ApexEvent for " + name + ": "
249                 + e.getMessage() + ", Event=" + event;
250             LOGGER.warn(errorMessage, e);
251             throw new ApexEventException(errorMessage, e);
252         }
253     }
254
255     /**
256      * Check if an event is filtered out and ignored.
257      *
258      * @param apexEvent the event to check
259      */
260     private boolean isEventFilteredOut(final ApexEvent apexEvent) {
261         // Check if we are filtering events on this unmarshaler, if so check the event name
262         // against the filter
263         if (consumerParameters.isSetEventNameFilter()
264             && !apexEvent.getName().matches(consumerParameters.getEventNameFilter())) {
265
266             if (LOGGER.isTraceEnabled()) {
267                 LOGGER.trace("onMessage(): event {} not processed, filtered  out by filter", apexEvent,
268                     consumerParameters.getEventNameFilter());
269             }
270
271             return true;
272         } else {
273             return false;
274         }
275     }
276
277     /**
278      * Run a thread that runs forever (well until system termination anyway) and listens for incoming events on the
279      * queue.
280      */
281     @Override
282     public void run() {
283         // Run until interruption
284         while (unmarshallerThread.isAlive() && !stopOrderedFlag) {
285             try {
286                 // Take the next event from the queue
287                 final ApexEvent apexEvent = queue.poll(EVENT_QUEUE_POLL_INTERVAL, TimeUnit.MILLISECONDS);
288                 if (apexEvent == null) {
289                     continue;
290                 }
291
292                 if (LOGGER.isTraceEnabled()) {
293                     String message = apexEvent.toString();
294                     LOGGER.trace("event received {}", message);
295                 }
296
297                 // Pass the event to the activator for forwarding to Apex
298                 engineServiceHandler.forwardEvent(apexEvent);
299             } catch (final InterruptedException e) {
300                 // restore the interrupt status
301                 Thread.currentThread().interrupt();
302                 LOGGER.warn("BatchProcessor thread interrupted, Reason {}", e.getMessage());
303                 stopOrderedFlag = true;
304             } catch (final Exception e) {
305                 LOGGER.warn("Error while forwarding events for " + unmarshallerThread.getName(), e);
306             }
307         }
308
309         // Stop event production
310         consumer.stop();
311     }
312
313     /**
314      * Get the unmarshaler thread.
315      *
316      * @return the unmarshaler thread
317      */
318     public Thread getThread() {
319         return unmarshallerThread;
320     }
321
322     /**
323      * Stop the Apex event unmarshaller's event producer using its termination mechanism.
324      */
325     public void stop() {
326         LOGGER.entry("shutting down Apex event unmarshaller . . .");
327
328         // Order the stop
329         stopOrderedFlag = true;
330
331         // Order a stop on the synchronous cache if one exists
332         if (consumerParameters != null && consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS)
333             && consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS) != null) {
334             ((SynchronousEventCache) consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS)).stop();
335         }
336
337         // Wait for thread shutdown
338         while (unmarshallerThread != null && unmarshallerThread.isAlive()) {
339             ThreadUtilities.sleep(UNMARSHALLER_SHUTDOWN_WAIT_INTERVAL);
340         }
341
342         LOGGER.exit("shut down Apex event unmarshaller");
343     }
344 }