Merge "Fix Sonar/Checkstyle issues in apex-pdp"
[policy/apex-pdp.git] / services / services-engine / src / main / java / org / onap / policy / apex / service / engine / main / ApexEventUnmarshaller.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.service.engine.main;
22
23 import java.util.List;
24 import java.util.Properties;
25 import java.util.concurrent.BlockingQueue;
26 import java.util.concurrent.LinkedBlockingQueue;
27 import java.util.concurrent.TimeUnit;
28
29 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
30 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
31 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
32 import org.onap.policy.apex.service.engine.event.ApexEvent;
33 import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
34 import org.onap.policy.apex.service.engine.event.ApexEventException;
35 import org.onap.policy.apex.service.engine.event.ApexEventProtocolConverter;
36 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
37 import org.onap.policy.apex.service.engine.event.PeeredReference;
38 import org.onap.policy.apex.service.engine.event.SynchronousEventCache;
39 import org.onap.policy.apex.service.engine.event.impl.EventConsumerFactory;
40 import org.onap.policy.apex.service.engine.event.impl.EventProtocolFactory;
41 import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters;
42 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
43 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
44 import org.slf4j.ext.XLogger;
45 import org.slf4j.ext.XLoggerFactory;
46
47 /**
48  * This event unmarshaler handles events coming into Apex, handles threading, event queuing, transformation and
49  * receiving using the configured receiving technology.
50  *
51  * @author Liam Fallon (liam.fallon@ericsson.com)
52  */
53 public class ApexEventUnmarshaller implements ApexEventReceiver, Runnable {
54     // Get a reference to the logger
55     private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexEventUnmarshaller.class);
56
57     // Interval to wait between thread shutdown checks
58     private static final int UNMARSHALLER_SHUTDOWN_WAIT_INTERVAL = 10;
59
60     // The amount of time to wait between polls of the event queue in milliseconds
61     private static final long EVENT_QUEUE_POLL_INTERVAL = 20;
62
63     // The name of the unmarshaler
64     private final String name;
65
66     // The engine service and consumer parameters
67     private final EngineServiceParameters engineServiceParameters;
68     private final EventHandlerParameters consumerParameters;
69
70     // The engine service handler to use for forwarding on of unmarshalled events
71     private ApexEngineServiceHandler engineServiceHandler;
72
73     // Apex event producer and event converter, all events are sent as string representations
74     private ApexEventConsumer consumer;
75     private ApexEventProtocolConverter converter;
76
77     // Temporary event holder for events going into Apex
78     private final BlockingQueue<ApexEvent> queue = new LinkedBlockingQueue<>();
79
80     // The unmarshaler thread and stopping flag
81     private Thread unmarshallerThread = null;
82     private boolean stopOrderedFlag = false;
83
84     /**
85      * Create the unmarshaler.
86      *
87      * @param name the name of the unmarshaler
88      * @param engineServiceParameters the engine service parameters for this Apex engine
89      * @param consumerParameters the consumer parameters for this specific unmarshaler
90      */
91     public ApexEventUnmarshaller(final String name, final EngineServiceParameters engineServiceParameters,
92             final EventHandlerParameters consumerParameters) {
93         this.name = name;
94         this.engineServiceParameters = engineServiceParameters;
95         this.consumerParameters = consumerParameters;
96     }
97
98     /**
99      * Configure the consumer and initialize the thread for event sending.
100      *
101      * @param incomingEngineServiceHandler the Apex engine service handler for passing events to Apex
102      * @throws ApexEventException on errors initializing event handling
103      */
104     public void init(final ApexEngineServiceHandler incomingEngineServiceHandler) throws ApexEventException {
105         this.engineServiceHandler = incomingEngineServiceHandler;
106
107         // Create the consumer for sending events and the converter for transforming events
108         consumer = new EventConsumerFactory().createConsumer(name, consumerParameters);
109         consumer.init(this.name, this.consumerParameters, this);
110
111         converter = new EventProtocolFactory().createConverter(name, consumerParameters.getEventProtocolParameters());
112     }
113
114     /**
115      * Start the unmarshaler and consumer threads.
116      */
117     public void start() {
118         // Start the consumer
119         consumer.start();
120
121         // Configure and start the event reception thread
122         final String threadName =
123                 engineServiceParameters.getEngineKey().getName() + ":" + this.getClass().getName() + ":" + name;
124         unmarshallerThread = new ApplicationThreadFactory(threadName).newThread(this);
125         unmarshallerThread.setDaemon(true);
126         unmarshallerThread.start();
127     }
128
129     /**
130      * Gets the name of the unmarshaler.
131      *
132      * @return the unmarshaler name
133      */
134     public String getName() {
135         return name;
136     }
137
138     /**
139      * Gets the technology specific consumer for this unmarshaler.
140      *
141      * @return the consumer
142      */
143     public ApexEventConsumer getConsumer() {
144         return consumer;
145     }
146
147     /**
148      * Gets the event protocol converter for this unmarshaler.
149      *
150      * @return the event protocol converter
151      */
152     public ApexEventProtocolConverter getConverter() {
153         return converter;
154     }
155
156     /**
157      * Connect a synchronous unmarshaler with a synchronous marshaler.
158      *
159      * @param peeredMode the peered mode under which the unmarshaler and marshaler are connected
160      * @param peeredMarshaller the synchronous marshaler to connect with
161      */
162     public void connectMarshaler(final EventHandlerPeeredMode peeredMode, final ApexEventMarshaller peeredMarshaller) {
163         switch (peeredMode) {
164             case SYNCHRONOUS:
165                 // To connect a synchronous unmarshaler and marshaler, we create a synchronous event
166                 // cache on the consumer/producer pair
167                 new SynchronousEventCache(peeredMode, consumer, peeredMarshaller.getProducer(),
168                         consumerParameters.getPeerTimeout(EventHandlerPeeredMode.SYNCHRONOUS));
169                 return;
170
171             case REQUESTOR:
172                 new PeeredReference(peeredMode, consumer, peeredMarshaller.getProducer());
173                 return;
174
175             default:
176                 return;
177         }
178     }
179
180     /**
181      * {@inheritDoc}
182      */
183     @Override
184     public void receiveEvent(final Properties executionProperties, final Object event) throws ApexEventException {
185         receiveEvent(0, executionProperties, event, true);
186     }
187
188     /**
189      * {@inheritDoc}
190      */
191     @Override
192     public void receiveEvent(final long executionId, final Properties executionProperties, final Object event)
193             throws ApexEventException {
194         receiveEvent(executionId, executionProperties, event, false);
195     }
196
197     /**
198      * Receive an event from a consumer, convert its protocol and forward it to Apex.
199      *
200      * @param executionId the execution id the incoming execution ID
201      * @param executionProperties properties used during processing of this event
202      * @param event the event in its native format
203      * @param generateExecutionId if true, let Apex generate the execution ID, if false, use the incoming execution ID
204      * @throws ApexEventException on unmarshaling errors on events
205      */
206     private void receiveEvent(final long executionId, final Properties executionProperties, final Object event,
207             final boolean generateExecutionId) throws ApexEventException {
208         // Push the event onto the queue
209         if (LOGGER.isTraceEnabled()) {
210             String eventString = "onMessage(): event received: " + event.toString();
211             LOGGER.trace(eventString);
212         }
213
214         // Convert the incoming events to Apex events
215         try {
216             final List<ApexEvent> apexEventList = converter.toApexEvent(consumerParameters.getEventName(), event);
217
218             for (final ApexEvent apexEvent : apexEventList) {
219                 isEventFilteredOut(apexEvent);
220
221                 // Check if this event is filtered out by the incoming filter
222                 if (isEventFilteredOut(apexEvent)) {
223                     // Ignore this event
224                     continue;
225                 }
226
227                 if (!generateExecutionId) {
228                     apexEvent.setExecutionId(executionId);
229                 }
230
231                 apexEvent.setExecutionProperties(executionProperties);
232
233                 // Enqueue the event
234                 queue.add(apexEvent);
235
236                 // Cache synchronized events that are sent
237                 if (consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS)) {
238                     final SynchronousEventCache synchronousEventCache =
239                             (SynchronousEventCache) consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS);
240                     synchronousEventCache.cacheSynchronizedEventToApex(apexEvent.getExecutionId(), apexEvent);
241                 }
242             }
243         } catch (final ApexException e) {
244             final String errorMessage = "Error while converting event into an ApexEvent for " + name + ": "
245                     + e.getMessage() + ", Event=" + event;
246             LOGGER.warn(errorMessage, e);
247             throw new ApexEventException(errorMessage, e);
248         }
249     }
250
251     /**
252      * Check if an event is filtered out and ignored.
253      *
254      * @param apexEvent the event to check
255      */
256     private boolean isEventFilteredOut(final ApexEvent apexEvent) {
257         // Check if we are filtering events on this unmarshaler, if so check the event name
258         // against the filter
259         if (consumerParameters.isSetEventNameFilter()
260                 && !apexEvent.getName().matches(consumerParameters.getEventNameFilter())) {
261
262             if (LOGGER.isTraceEnabled()) {
263                 LOGGER.trace("onMessage(): event {} not processed, filtered  out by filter", apexEvent,
264                         consumerParameters.getEventNameFilter());
265             }
266
267             return true;
268         } else {
269             return false;
270         }
271     }
272
273     /**
274      * Run a thread that runs forever (well until system termination anyway) and listens for incoming events on the
275      * queue.
276      */
277     @Override
278     public void run() {
279         // Run until interruption
280         while (unmarshallerThread.isAlive() && !stopOrderedFlag) {
281             try {
282                 // Take the next event from the queue
283                 final ApexEvent apexEvent = queue.poll(EVENT_QUEUE_POLL_INTERVAL, TimeUnit.MILLISECONDS);
284                 if (apexEvent == null) {
285                     continue;
286                 }
287
288                 if (LOGGER.isTraceEnabled()) {
289                     String message = apexEvent.toString();
290                     LOGGER.trace("event received {}", message);
291                 }
292
293                 // Pass the event to the activator for forwarding to Apex
294                 engineServiceHandler.forwardEvent(apexEvent);
295             } catch (final InterruptedException e) {
296                 // restore the interrupt status
297                 Thread.currentThread().interrupt();
298                 LOGGER.warn("BatchProcessor thread interrupted, Reason {}", e.getMessage());
299                 stopOrderedFlag = true;
300             } catch (final Exception e) {
301                 LOGGER.warn("Error while forwarding events for " + unmarshallerThread.getName(), e);
302             }
303         }
304
305         // Stop event production
306         consumer.stop();
307     }
308
309     /**
310      * Get the unmarshaler thread.
311      *
312      * @return the unmarshaler thread
313      */
314     public Thread getThread() {
315         return unmarshallerThread;
316     }
317
318     /**
319      * Stop the Apex event unmarshaller's event producer using its termination mechanism.
320      */
321     public void stop() {
322         LOGGER.entry("shutting down Apex event unmarshaller . . .");
323
324         // Order the stop
325         stopOrderedFlag = true;
326
327         // Order a stop on the synchronous cache if one exists
328         if (consumerParameters != null && consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS)
329                 && consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS) != null) {
330             ((SynchronousEventCache) consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS)).stop();
331         }
332
333         // Wait for thread shutdown
334         while (unmarshallerThread != null && unmarshallerThread.isAlive()) {
335             ThreadUtilities.sleep(UNMARSHALLER_SHUTDOWN_WAIT_INTERVAL);
336         }
337
338         LOGGER.exit("shut down Apex event unmarshaller");
339     }
340 }