Changes for checkstyle 8.32
[policy/apex-pdp.git] / services / services-engine / src / main / java / org / onap / policy / apex / service / engine / main / ApexEventUnmarshaller.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2019-2020 Nordix Foundation.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.apex.service.engine.main;
23
24 import java.util.List;
25 import java.util.Properties;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.TimeUnit;
29 import lombok.NonNull;
30 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
31 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
32 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
33 import org.onap.policy.apex.service.engine.event.ApexEvent;
34 import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
35 import org.onap.policy.apex.service.engine.event.ApexEventException;
36 import org.onap.policy.apex.service.engine.event.ApexEventProtocolConverter;
37 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
38 import org.onap.policy.apex.service.engine.event.PeeredReference;
39 import org.onap.policy.apex.service.engine.event.SynchronousEventCache;
40 import org.onap.policy.apex.service.engine.event.impl.EventConsumerFactory;
41 import org.onap.policy.apex.service.engine.event.impl.EventProtocolFactory;
42 import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters;
43 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
44 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
45 import org.slf4j.ext.XLogger;
46 import org.slf4j.ext.XLoggerFactory;
47
48 /**
49  * This event unmarshaler handles events coming into Apex, handles threading, event queuing, transformation and
50  * receiving using the configured receiving technology.
51  *
52  * @author Liam Fallon (liam.fallon@ericsson.com)
53  */
54 public class ApexEventUnmarshaller implements ApexEventReceiver, Runnable {
55     // Get a reference to the logger
56     private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexEventUnmarshaller.class);
57
58     // Interval to wait between thread shutdown checks
59     private static final int UNMARSHALLER_SHUTDOWN_WAIT_INTERVAL = 10;
60
61     // The amount of time to wait between polls of the event queue in milliseconds
62     private static final long EVENT_QUEUE_POLL_INTERVAL = 20;
63
64     // The name of the unmarshaler
65     private final String name;
66
67     // The engine service and consumer parameters
68     private final EngineServiceParameters engineServiceParameters;
69     private final EventHandlerParameters consumerParameters;
70
71     // The engine service handler to use for forwarding on of unmarshalled events
72     private ApexEngineServiceHandler engineServiceHandler;
73
74     // Apex event producer and event converter, all events are sent as string representations
75     private ApexEventConsumer consumer;
76     private ApexEventProtocolConverter converter;
77
78     // Temporary event holder for events going into Apex
79     private final BlockingQueue<ApexEvent> queue = new LinkedBlockingQueue<>();
80
81     // The unmarshaler thread and stopping flag
82     private Thread unmarshallerThread = null;
83     private boolean stopOrderedFlag = false;
84
85     /**
86      * Create the unmarshaler.
87      *
88      * @param name the name of the unmarshaler
89      * @param engineServiceParameters the engine service parameters for this Apex engine
90      * @param consumerParameters the consumer parameters for this specific unmarshaler
91      */
92     public ApexEventUnmarshaller(final String name, final EngineServiceParameters engineServiceParameters,
93         final EventHandlerParameters consumerParameters) {
94         this.name = name;
95         this.engineServiceParameters = engineServiceParameters;
96         this.consumerParameters = consumerParameters;
97     }
98
99     /**
100      * Configure the consumer and initialize the thread for event sending.
101      *
102      * @param incomingEngineServiceHandler the Apex engine service handler for passing events to Apex
103      * @throws ApexEventException on errors initializing event handling
104      */
105     public void init(final ApexEngineServiceHandler incomingEngineServiceHandler) throws ApexEventException {
106         this.engineServiceHandler = incomingEngineServiceHandler;
107
108         // Create the consumer for sending events and the converter for transforming events
109         consumer = new EventConsumerFactory().createConsumer(name, consumerParameters);
110         consumer.init(this.name, this.consumerParameters, this);
111
112         converter = new EventProtocolFactory().createConverter(name, consumerParameters.getEventProtocolParameters());
113     }
114
115     /**
116      * Start the unmarshaler and consumer threads.
117      */
118     public void start() {
119         // Start the consumer
120         consumer.start();
121
122         // Configure and start the event reception thread
123         final String threadName =
124             engineServiceParameters.getEngineKey().getName() + ":" + this.getClass().getName() + ":" + name;
125         unmarshallerThread = new ApplicationThreadFactory(threadName).newThread(this);
126         unmarshallerThread.setDaemon(true);
127         unmarshallerThread.start();
128     }
129
130     /**
131      * Gets the name of the unmarshaler.
132      *
133      * @return the unmarshaler name
134      */
135     public String getName() {
136         return name;
137     }
138
139     /**
140      * Gets the technology specific consumer for this unmarshaler.
141      *
142      * @return the consumer
143      */
144     public ApexEventConsumer getConsumer() {
145         return consumer;
146     }
147
148     /**
149      * Gets the event protocol converter for this unmarshaler.
150      *
151      * @return the event protocol converter
152      */
153     public ApexEventProtocolConverter getConverter() {
154         return converter;
155     }
156
157     /**
158      * Connect a synchronous unmarshaler with a synchronous marshaler.
159      *
160      * @param peeredMode the peered mode under which the unmarshaler and marshaler are connected
161      * @param peeredMarshaller the synchronous marshaler to connect with
162      */
163     public void connectMarshaler(final EventHandlerPeeredMode peeredMode, final ApexEventMarshaller peeredMarshaller) {
164         switch (peeredMode) {
165             case SYNCHRONOUS:
166                 // To connect a synchronous unmarshaler and marshaler, we create a synchronous event
167                 // cache on the consumer/producer pair
168                 new SynchronousEventCache(peeredMode, consumer, peeredMarshaller.getProducer(),
169                     consumerParameters.getPeerTimeout(EventHandlerPeeredMode.SYNCHRONOUS));
170                 return;
171
172             case REQUESTOR:
173                 new PeeredReference(peeredMode, consumer, peeredMarshaller.getProducer());
174                 return;
175
176             default:
177                 return;
178         }
179     }
180
181     /**
182      * {@inheritDoc}.
183      */
184     @Override
185     public void receiveEvent(@NonNull final Properties executionProperties, final Object event)
186         throws ApexEventException {
187         receiveEvent(0, executionProperties, event, true);
188     }
189
190     /**
191      * {@inheritDoc}.
192      */
193     @Override
194     public void receiveEvent(final long executionId, @NonNull final Properties executionProperties, final Object event)
195         throws ApexEventException {
196         receiveEvent(executionId, executionProperties, event, false);
197     }
198
199     /**
200      * Receive an event from a consumer, convert its protocol and forward it to Apex.
201      *
202      * @param executionId the execution id the incoming execution ID
203      * @param executionProperties properties used during processing of this event
204      * @param event the event in its native format
205      * @param generateExecutionId if true, let Apex generate the execution ID, if false, use the incoming execution ID
206      * @throws ApexEventException on unmarshaling errors on events
207      */
208     private void receiveEvent(final long executionId, final Properties executionProperties, final Object event,
209         final boolean generateExecutionId) throws ApexEventException {
210         // Push the event onto the queue
211         if (LOGGER.isTraceEnabled()) {
212             String eventString = "onMessage(): event received: " + event.toString();
213             LOGGER.trace(eventString);
214         }
215
216         // Convert the incoming events to Apex events
217         try {
218             final List<ApexEvent> apexEventList = converter.toApexEvent(consumerParameters.getEventName(), event);
219
220             for (final ApexEvent apexEvent : apexEventList) {
221                 isEventFilteredOut(apexEvent);
222
223                 // Check if this event is filtered out by the incoming filter
224                 if (isEventFilteredOut(apexEvent)) {
225                     // Ignore this event
226                     continue;
227                 }
228
229                 if (!generateExecutionId) {
230                     apexEvent.setExecutionId(executionId);
231                 }
232
233                 apexEvent.setExecutionProperties(executionProperties);
234
235                 // Cache synchronized events that are sent
236                 if (consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS)) {
237                     final SynchronousEventCache synchronousEventCache =
238                         (SynchronousEventCache) consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS);
239                     synchronousEventCache.cacheSynchronizedEventToApex(apexEvent.getExecutionId(), apexEvent);
240                 }
241
242                 // Enqueue the event
243                 queue.add(apexEvent);
244             }
245         } catch (final ApexException e) {
246             final String errorMessage = "Error while converting event into an ApexEvent for " + name + ": "
247                 + e.getMessage() + ", Event=" + event;
248             LOGGER.warn(errorMessage, e);
249             throw new ApexEventException(errorMessage, e);
250         }
251     }
252
253     /**
254      * Check if an event is filtered out and ignored.
255      *
256      * @param apexEvent the event to check
257      */
258     private boolean isEventFilteredOut(final ApexEvent apexEvent) {
259         // Check if we are filtering events on this unmarshaler, if so check the event name
260         // against the filter
261         if (consumerParameters.isSetEventNameFilter()
262             && !apexEvent.getName().matches(consumerParameters.getEventNameFilter())) {
263
264             if (LOGGER.isTraceEnabled()) {
265                 LOGGER.trace("onMessage(): event {} not processed, filtered  out by filter", apexEvent,
266                     consumerParameters.getEventNameFilter());
267             }
268
269             return true;
270         } else {
271             return false;
272         }
273     }
274
275     /**
276      * Run a thread that runs forever (well until system termination anyway) and listens for incoming events on the
277      * queue.
278      */
279     @Override
280     public void run() {
281         // Run until interruption
282         while (unmarshallerThread.isAlive() && !stopOrderedFlag) {
283             try {
284                 // Take the next event from the queue
285                 final ApexEvent apexEvent = queue.poll(EVENT_QUEUE_POLL_INTERVAL, TimeUnit.MILLISECONDS);
286                 if (apexEvent == null) {
287                     continue;
288                 }
289
290                 if (LOGGER.isTraceEnabled()) {
291                     String message = apexEvent.toString();
292                     LOGGER.trace("event received {}", message);
293                 }
294
295                 // Pass the event to the activator for forwarding to Apex
296                 engineServiceHandler.forwardEvent(apexEvent);
297             } catch (final InterruptedException e) {
298                 // restore the interrupt status
299                 Thread.currentThread().interrupt();
300                 LOGGER.warn("BatchProcessor thread interrupted, Reason {}", e.getMessage());
301                 stopOrderedFlag = true;
302             } catch (final Exception e) {
303                 LOGGER.warn("Error while forwarding events for " + unmarshallerThread.getName(), e);
304             }
305         }
306
307         // Stop event production
308         consumer.stop();
309     }
310
311     /**
312      * Get the unmarshaler thread.
313      *
314      * @return the unmarshaler thread
315      */
316     public Thread getThread() {
317         return unmarshallerThread;
318     }
319
320     /**
321      * Stop the Apex event unmarshaller's event producer using its termination mechanism.
322      */
323     public void stop() {
324         LOGGER.entry("shutting down Apex event unmarshaller . . .");
325
326         // Order the stop
327         stopOrderedFlag = true;
328
329         // Order a stop on the synchronous cache if one exists
330         if (consumerParameters != null && consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS)
331             && consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS) != null) {
332             ((SynchronousEventCache) consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS)).stop();
333         }
334
335         // Wait for thread shutdown
336         while (unmarshallerThread != null && unmarshallerThread.isAlive()) {
337             ThreadUtilities.sleep(UNMARSHALLER_SHUTDOWN_WAIT_INTERVAL);
338         }
339
340         LOGGER.exit("shut down Apex event unmarshaller");
341     }
342 }