8f3bb858f2d5a6452fc284f9efe039920df9e71d
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2019-2021 Nordix Foundation.
5  *  Modifications Copyright (C) 2020-2021 Bell Canada. All rights reserved.
6  *  Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  * SPDX-License-Identifier: Apache-2.0
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.policy.apex.service.engine.main;
25
26 import java.util.Arrays;
27 import java.util.Collections;
28 import java.util.Iterator;
29 import java.util.List;
30 import java.util.Properties;
31 import java.util.concurrent.BlockingQueue;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.concurrent.TimeUnit;
34 import lombok.Getter;
35 import lombok.NonNull;
36 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
37 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
38 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
39 import org.onap.policy.apex.service.engine.event.ApexEvent;
40 import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
41 import org.onap.policy.apex.service.engine.event.ApexEventException;
42 import org.onap.policy.apex.service.engine.event.ApexEventProtocolConverter;
43 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
44 import org.onap.policy.apex.service.engine.event.PeeredReference;
45 import org.onap.policy.apex.service.engine.event.SynchronousEventCache;
46 import org.onap.policy.apex.service.engine.event.impl.EventConsumerFactory;
47 import org.onap.policy.apex.service.engine.event.impl.EventProtocolFactory;
48 import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters;
49 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
50 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
51 import org.slf4j.ext.XLogger;
52 import org.slf4j.ext.XLoggerFactory;
53
54 /**
55  * This event unmarshaler handles events coming into Apex, handles threading, event queuing, transformation and
56  * receiving using the configured receiving technology.
57  *
58  * @author Liam Fallon (liam.fallon@ericsson.com)
59  */
60 public class ApexEventUnmarshaller implements ApexEventReceiver, Runnable {
61     // Get a reference to the logger
62     private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexEventUnmarshaller.class);
63
64     // Interval to wait between thread shutdown checks
65     private static final int UNMARSHALLER_SHUTDOWN_WAIT_INTERVAL = 10;
66
67     // The amount of time to wait between polls of the event queue in milliseconds
68     private static final long EVENT_QUEUE_POLL_INTERVAL = 20;
69
70     // The name of the unmarshaler
71     @Getter
72     private final String name;
73
74     // The engine service and consumer parameters
75     private final EngineServiceParameters engineServiceParameters;
76     private final EventHandlerParameters consumerParameters;
77
78     // The engine service handler to use for forwarding on of unmarshalled events
79     private ApexEngineServiceHandler engineServiceHandler;
80
81     // Apex event producer and event converter, all events are sent as string representations
82     @Getter
83     private ApexEventConsumer consumer;
84     @Getter
85     private ApexEventProtocolConverter converter;
86
87     // Temporary event holder for events going into Apex
88     private final BlockingQueue<ApexEvent> queue = new LinkedBlockingQueue<>();
89
90     // The unmarshaler thread and stopping flag
91     private Thread unmarshallerThread = null;
92     private boolean stopOrderedFlag = false;
93
94     /**
95      * Create the unmarshaler.
96      *
97      * @param name the name of the unmarshaler
98      * @param engineServiceParameters the engine service parameters for this Apex engine
99      * @param consumerParameters the consumer parameters for this specific unmarshaler
100      */
101     public ApexEventUnmarshaller(final String name, final EngineServiceParameters engineServiceParameters,
102         final EventHandlerParameters consumerParameters) {
103         this.name = name;
104         this.engineServiceParameters = engineServiceParameters;
105         this.consumerParameters = consumerParameters;
106     }
107
108     /**
109      * Configure the consumer and initialize the thread for event sending.
110      *
111      * @param incomingEngineServiceHandler the Apex engine service handler for passing events to Apex
112      * @throws ApexEventException on errors initializing event handling
113      */
114     public void init(final ApexEngineServiceHandler incomingEngineServiceHandler) throws ApexEventException {
115         this.engineServiceHandler = incomingEngineServiceHandler;
116
117         // Create the consumer for sending events and the converter for transforming events
118         consumer = new EventConsumerFactory().createConsumer(name, consumerParameters);
119         consumer.init(this.name, this.consumerParameters, this);
120
121         converter = new EventProtocolFactory().createConverter(name, consumerParameters.getEventProtocolParameters());
122     }
123
124     /**
125      * Start the unmarshaler and consumer threads.
126      */
127     public void start() {
128         // Start the consumer
129         consumer.start();
130
131         // Configure and start the event reception thread
132         final String threadName =
133             engineServiceParameters.getEngineKey().getName() + ":" + this.getClass().getName() + ":" + name;
134         unmarshallerThread = new ApplicationThreadFactory(threadName).newThread(this);
135         unmarshallerThread.setDaemon(true);
136         unmarshallerThread.start();
137     }
138
139     /**
140      * Connect a synchronous unmarshaler with a synchronous marshaler.
141      *
142      * @param peeredMode the peered mode under which the unmarshaler and marshaler are connected
143      * @param peeredMarshaller the synchronous marshaler to connect with
144      */
145     public void connectMarshaler(final EventHandlerPeeredMode peeredMode, final ApexEventMarshaller peeredMarshaller) {
146         switch (peeredMode) {
147             case SYNCHRONOUS:
148                 // To connect a synchronous unmarshaler and marshaler, we create a synchronous event
149                 // cache on the consumer/producer pair
150                 new SynchronousEventCache(peeredMode, consumer, peeredMarshaller.getProducer(),
151                     consumerParameters.getPeerTimeout(EventHandlerPeeredMode.SYNCHRONOUS));
152                 return;
153
154             case REQUESTOR:
155                 new PeeredReference(peeredMode, consumer, peeredMarshaller.getProducer());
156                 return;
157
158             default:
159                 return;
160         }
161     }
162
163     /**
164      * {@inheritDoc}.
165      */
166     @Override
167     public void receiveEvent(@NonNull final Properties executionProperties, final Object event)
168         throws ApexEventException {
169         receiveEvent(0, executionProperties, event, true);
170     }
171
172     /**
173      * {@inheritDoc}.
174      */
175     @Override
176     public void receiveEvent(final long executionId, @NonNull final Properties executionProperties, final Object event)
177         throws ApexEventException {
178         receiveEvent(executionId, executionProperties, event, false);
179     }
180
181     /**
182      * Receive an event from a consumer, convert its protocol and forward it to Apex.
183      *
184      * @param executionId the execution id the incoming execution ID
185      * @param executionProperties properties used during processing of this event
186      * @param event the event in its native format
187      * @param generateExecutionId if true, let Apex generate the execution ID, if false, use the incoming execution ID
188      * @throws ApexEventException on unmarshaling errors on events
189      */
190     private void receiveEvent(final long executionId, final Properties executionProperties, final Object event,
191         final boolean generateExecutionId) throws ApexEventException {
192         // Push the event onto the queue
193         if (LOGGER.isTraceEnabled()) {
194             var eventString = "onMessage(): event received: " + event.toString();
195             LOGGER.trace(eventString);
196         }
197
198         // Convert the incoming events to Apex events
199         List<ApexEvent> apexEventList = convertToApexEvents(event);
200
201         for (final ApexEvent apexEvent : apexEventList) {
202             // Check if this event is filtered out by the incoming filter
203             if (isEventFilteredOut(apexEvent)) {
204                 // Ignore this event
205                 continue;
206             }
207             if (!generateExecutionId) {
208                 apexEvent.setExecutionId(executionId);
209                 apexEvent.setExecutionProperties(executionProperties);
210             } else {
211                 // Clean up executionProperties in case if it is not a response event to a request made from APEX
212                 apexEvent.setExecutionProperties(new Properties(executionProperties));
213             }
214             // Cache synchronized events that are sent
215             if (consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS)) {
216                 final var synchronousEventCache =
217                     (SynchronousEventCache) consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS);
218                 synchronousEventCache.cacheSynchronizedEventToApex(apexEvent.getExecutionId(), apexEvent);
219             }
220
221             // Enqueue the event
222             queue.add(apexEvent);
223         }
224     }
225
226     private List<ApexEvent> convertToApexEvents(final Object event) throws ApexEventException {
227         List<ApexEvent> apexEventList = null;
228         List<String> eventNamesList = null;
229         if (consumerParameters.getEventName() != null) {
230             eventNamesList = Arrays.asList(consumerParameters.getEventName().split("\\|"));
231         } else {
232             eventNamesList = Collections.singletonList(null);
233         }
234         Iterator<String> iterator = eventNamesList.iterator();
235         // Incoming events in an endpoint can have different structure , for e.g., success/failure response events
236         // Parse the incoming event into an APEX event defined with any one of the names specified in eventName field
237         while (iterator.hasNext()) {
238             try {
239                 String eventName = iterator.next();
240                 apexEventList = converter.toApexEvent(eventName, event);
241                 break;
242             } catch (ApexException e) {
243                 if (!iterator.hasNext()) {
244                     final String errorMessage = "Error while converting event into an ApexEvent for " + name;
245                     if (!LOGGER.isDebugEnabled()) {
246                         LOGGER.warn("{}. Detailed logs are available at debug level.", errorMessage);
247                     }
248                     throw new ApexEventException(errorMessage, e);
249                 }
250             }
251         }
252         return apexEventList;
253     }
254
255     /**
256      * Check if an event is filtered out and ignored.
257      *
258      * @param apexEvent the event to check
259      */
260     private boolean isEventFilteredOut(final ApexEvent apexEvent) {
261         // Check if we are filtering events on this unmarshaler, if so check the event name
262         // against the filter
263         if (consumerParameters.isSetEventNameFilter()
264             && !apexEvent.getName().matches(consumerParameters.getEventNameFilter())) {
265
266             if (LOGGER.isTraceEnabled()) {
267                 LOGGER.trace("onMessage(): event {} not processed, filtered  out by filter", apexEvent,
268                     consumerParameters.getEventNameFilter());
269             }
270
271             return true;
272         } else {
273             return false;
274         }
275     }
276
277     /**
278      * Run a thread that runs forever (well until system termination anyway) and listens for incoming events on the
279      * queue.
280      */
281     @Override
282     public void run() {
283         // Run until interruption
284         while (unmarshallerThread.isAlive() && !stopOrderedFlag) {
285             try {
286                 // Take the next event from the queue
287                 final var apexEvent = queue.poll(EVENT_QUEUE_POLL_INTERVAL, TimeUnit.MILLISECONDS);
288                 if (apexEvent == null) {
289                     continue;
290                 }
291
292                 if (LOGGER.isTraceEnabled()) {
293                     var message = apexEvent.toString();
294                     LOGGER.trace("event received {}", message);
295                 }
296
297                 // Pass the event to the activator for forwarding to Apex
298                 engineServiceHandler.forwardEvent(apexEvent);
299             } catch (final InterruptedException e) {
300                 // restore the interrupt status
301                 Thread.currentThread().interrupt();
302                 LOGGER.warn("BatchProcessor thread interrupted, Reason {}", e.getMessage());
303                 stopOrderedFlag = true;
304             } catch (final Exception e) {
305                 LOGGER.warn("Error while forwarding events for " + unmarshallerThread.getName(), e);
306             }
307         }
308
309         // Stop event production
310         consumer.stop();
311     }
312
313     /**
314      * Get the unmarshaler thread.
315      *
316      * @return the unmarshaler thread
317      */
318     public Thread getThread() {
319         return unmarshallerThread;
320     }
321
322     /**
323      * Stop the Apex event unmarshaller's event producer using its termination mechanism.
324      */
325     public void stop() {
326         LOGGER.entry("shutting down Apex event unmarshaller . . .");
327
328         // Order the stop
329         stopOrderedFlag = true;
330
331         // Wait for thread shutdown
332         while (unmarshallerThread != null && unmarshallerThread.isAlive()) {
333             ThreadUtilities.sleep(UNMARSHALLER_SHUTDOWN_WAIT_INTERVAL);
334         }
335
336         // Order a stop on the synchronous cache if one exists
337         if (consumerParameters != null && consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS)
338             && consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS) != null) {
339             ((SynchronousEventCache) consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS)).stop();
340         }
341         LOGGER.exit("shut down Apex event unmarshaller");
342     }
343 }