fdc404c67cec5b8f5d078ab9cf024872dcb8f5bc
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2019-2020 Nordix Foundation.
5  *  Modifications Copyright (C) 2020-2021 Bell Canada. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  * SPDX-License-Identifier: Apache-2.0
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.apex.service.engine.main;
24
25 import java.util.Arrays;
26 import java.util.Collections;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.util.Properties;
30 import java.util.concurrent.BlockingQueue;
31 import java.util.concurrent.LinkedBlockingQueue;
32 import java.util.concurrent.TimeUnit;
33 import lombok.NonNull;
34 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
35 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
36 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
37 import org.onap.policy.apex.service.engine.event.ApexEvent;
38 import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
39 import org.onap.policy.apex.service.engine.event.ApexEventException;
40 import org.onap.policy.apex.service.engine.event.ApexEventProtocolConverter;
41 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
42 import org.onap.policy.apex.service.engine.event.PeeredReference;
43 import org.onap.policy.apex.service.engine.event.SynchronousEventCache;
44 import org.onap.policy.apex.service.engine.event.impl.EventConsumerFactory;
45 import org.onap.policy.apex.service.engine.event.impl.EventProtocolFactory;
46 import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters;
47 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
48 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
49 import org.slf4j.ext.XLogger;
50 import org.slf4j.ext.XLoggerFactory;
51
52 /**
53  * This event unmarshaler handles events coming into Apex, handles threading, event queuing, transformation and
54  * receiving using the configured receiving technology.
55  *
56  * @author Liam Fallon (liam.fallon@ericsson.com)
57  */
58 public class ApexEventUnmarshaller implements ApexEventReceiver, Runnable {
59     // Get a reference to the logger
60     private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexEventUnmarshaller.class);
61
62     // Interval to wait between thread shutdown checks
63     private static final int UNMARSHALLER_SHUTDOWN_WAIT_INTERVAL = 10;
64
65     // The amount of time to wait between polls of the event queue in milliseconds
66     private static final long EVENT_QUEUE_POLL_INTERVAL = 20;
67
68     // The name of the unmarshaler
69     private final String name;
70
71     // The engine service and consumer parameters
72     private final EngineServiceParameters engineServiceParameters;
73     private final EventHandlerParameters consumerParameters;
74
75     // The engine service handler to use for forwarding on of unmarshalled events
76     private ApexEngineServiceHandler engineServiceHandler;
77
78     // Apex event producer and event converter, all events are sent as string representations
79     private ApexEventConsumer consumer;
80     private ApexEventProtocolConverter converter;
81
82     // Temporary event holder for events going into Apex
83     private final BlockingQueue<ApexEvent> queue = new LinkedBlockingQueue<>();
84
85     // The unmarshaler thread and stopping flag
86     private Thread unmarshallerThread = null;
87     private boolean stopOrderedFlag = false;
88
89     /**
90      * Create the unmarshaler.
91      *
92      * @param name the name of the unmarshaler
93      * @param engineServiceParameters the engine service parameters for this Apex engine
94      * @param consumerParameters the consumer parameters for this specific unmarshaler
95      */
96     public ApexEventUnmarshaller(final String name, final EngineServiceParameters engineServiceParameters,
97         final EventHandlerParameters consumerParameters) {
98         this.name = name;
99         this.engineServiceParameters = engineServiceParameters;
100         this.consumerParameters = consumerParameters;
101     }
102
103     /**
104      * Configure the consumer and initialize the thread for event sending.
105      *
106      * @param incomingEngineServiceHandler the Apex engine service handler for passing events to Apex
107      * @throws ApexEventException on errors initializing event handling
108      */
109     public void init(final ApexEngineServiceHandler incomingEngineServiceHandler) throws ApexEventException {
110         this.engineServiceHandler = incomingEngineServiceHandler;
111
112         // Create the consumer for sending events and the converter for transforming events
113         consumer = new EventConsumerFactory().createConsumer(name, consumerParameters);
114         consumer.init(this.name, this.consumerParameters, this);
115
116         converter = new EventProtocolFactory().createConverter(name, consumerParameters.getEventProtocolParameters());
117     }
118
119     /**
120      * Start the unmarshaler and consumer threads.
121      */
122     public void start() {
123         // Start the consumer
124         consumer.start();
125
126         // Configure and start the event reception thread
127         final String threadName =
128             engineServiceParameters.getEngineKey().getName() + ":" + this.getClass().getName() + ":" + name;
129         unmarshallerThread = new ApplicationThreadFactory(threadName).newThread(this);
130         unmarshallerThread.setDaemon(true);
131         unmarshallerThread.start();
132     }
133
134     /**
135      * Gets the name of the unmarshaler.
136      *
137      * @return the unmarshaler name
138      */
139     public String getName() {
140         return name;
141     }
142
143     /**
144      * Gets the technology specific consumer for this unmarshaler.
145      *
146      * @return the consumer
147      */
148     public ApexEventConsumer getConsumer() {
149         return consumer;
150     }
151
152     /**
153      * Gets the event protocol converter for this unmarshaler.
154      *
155      * @return the event protocol converter
156      */
157     public ApexEventProtocolConverter getConverter() {
158         return converter;
159     }
160
161     /**
162      * Connect a synchronous unmarshaler with a synchronous marshaler.
163      *
164      * @param peeredMode the peered mode under which the unmarshaler and marshaler are connected
165      * @param peeredMarshaller the synchronous marshaler to connect with
166      */
167     public void connectMarshaler(final EventHandlerPeeredMode peeredMode, final ApexEventMarshaller peeredMarshaller) {
168         switch (peeredMode) {
169             case SYNCHRONOUS:
170                 // To connect a synchronous unmarshaler and marshaler, we create a synchronous event
171                 // cache on the consumer/producer pair
172                 new SynchronousEventCache(peeredMode, consumer, peeredMarshaller.getProducer(),
173                     consumerParameters.getPeerTimeout(EventHandlerPeeredMode.SYNCHRONOUS));
174                 return;
175
176             case REQUESTOR:
177                 new PeeredReference(peeredMode, consumer, peeredMarshaller.getProducer());
178                 return;
179
180             default:
181                 return;
182         }
183     }
184
185     /**
186      * {@inheritDoc}.
187      */
188     @Override
189     public void receiveEvent(@NonNull final Properties executionProperties, final Object event)
190         throws ApexEventException {
191         receiveEvent(0, executionProperties, event, true);
192     }
193
194     /**
195      * {@inheritDoc}.
196      */
197     @Override
198     public void receiveEvent(final long executionId, @NonNull final Properties executionProperties, final Object event)
199         throws ApexEventException {
200         receiveEvent(executionId, executionProperties, event, false);
201     }
202
203     /**
204      * Receive an event from a consumer, convert its protocol and forward it to Apex.
205      *
206      * @param executionId the execution id the incoming execution ID
207      * @param executionProperties properties used during processing of this event
208      * @param event the event in its native format
209      * @param generateExecutionId if true, let Apex generate the execution ID, if false, use the incoming execution ID
210      * @throws ApexEventException on unmarshaling errors on events
211      */
212     private void receiveEvent(final long executionId, final Properties executionProperties, final Object event,
213         final boolean generateExecutionId) throws ApexEventException {
214         // Push the event onto the queue
215         if (LOGGER.isTraceEnabled()) {
216             String eventString = "onMessage(): event received: " + event.toString();
217             LOGGER.trace(eventString);
218         }
219
220         // Convert the incoming events to Apex events
221         List<ApexEvent> apexEventList = convertToApexEvents(event);
222
223         for (final ApexEvent apexEvent : apexEventList) {
224             // Check if this event is filtered out by the incoming filter
225             if (isEventFilteredOut(apexEvent)) {
226                 // Ignore this event
227                 continue;
228             }
229
230             if (!generateExecutionId) {
231                 apexEvent.setExecutionId(executionId);
232             }
233
234             apexEvent.setExecutionProperties(executionProperties);
235
236             // Cache synchronized events that are sent
237             if (consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS)) {
238                 final SynchronousEventCache synchronousEventCache =
239                     (SynchronousEventCache) consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS);
240                 synchronousEventCache.cacheSynchronizedEventToApex(apexEvent.getExecutionId(), apexEvent);
241             }
242
243             // Enqueue the event
244             queue.add(apexEvent);
245         }
246     }
247
248     private List<ApexEvent> convertToApexEvents(final Object event) throws ApexEventException {
249         List<ApexEvent> apexEventList = null;
250         List<String> eventNamesList = null;
251         if (consumerParameters.getEventName() != null) {
252             eventNamesList = Arrays.asList(consumerParameters.getEventName().split("\\|"));
253         } else {
254             eventNamesList = Collections.singletonList(null);
255         }
256         Iterator<String> iterator = eventNamesList.iterator();
257         // Incoming events in an endpoint can have different structure , for e.g., success/failure response events
258         // Parse the incoming event into an APEX event defined with any one of the names specified in eventName field
259         while (iterator.hasNext()) {
260             try {
261                 String eventName = iterator.next();
262                 apexEventList = converter.toApexEvent(eventName, event);
263                 break;
264             } catch (ApexException e) {
265                 if (!iterator.hasNext()) {
266                     final String errorMessage = "Error while converting event into an ApexEvent for " + name;
267                     throw new ApexEventException(errorMessage, e);
268                 }
269             }
270         }
271         return apexEventList;
272     }
273
274     /**
275      * Check if an event is filtered out and ignored.
276      *
277      * @param apexEvent the event to check
278      */
279     private boolean isEventFilteredOut(final ApexEvent apexEvent) {
280         // Check if we are filtering events on this unmarshaler, if so check the event name
281         // against the filter
282         if (consumerParameters.isSetEventNameFilter()
283             && !apexEvent.getName().matches(consumerParameters.getEventNameFilter())) {
284
285             if (LOGGER.isTraceEnabled()) {
286                 LOGGER.trace("onMessage(): event {} not processed, filtered  out by filter", apexEvent,
287                     consumerParameters.getEventNameFilter());
288             }
289
290             return true;
291         } else {
292             return false;
293         }
294     }
295
296     /**
297      * Run a thread that runs forever (well until system termination anyway) and listens for incoming events on the
298      * queue.
299      */
300     @Override
301     public void run() {
302         // Run until interruption
303         while (unmarshallerThread.isAlive() && !stopOrderedFlag) {
304             try {
305                 // Take the next event from the queue
306                 final ApexEvent apexEvent = queue.poll(EVENT_QUEUE_POLL_INTERVAL, TimeUnit.MILLISECONDS);
307                 if (apexEvent == null) {
308                     continue;
309                 }
310
311                 if (LOGGER.isTraceEnabled()) {
312                     String message = apexEvent.toString();
313                     LOGGER.trace("event received {}", message);
314                 }
315
316                 // Pass the event to the activator for forwarding to Apex
317                 engineServiceHandler.forwardEvent(apexEvent);
318             } catch (final InterruptedException e) {
319                 // restore the interrupt status
320                 Thread.currentThread().interrupt();
321                 LOGGER.warn("BatchProcessor thread interrupted, Reason {}", e.getMessage());
322                 stopOrderedFlag = true;
323             } catch (final Exception e) {
324                 LOGGER.warn("Error while forwarding events for " + unmarshallerThread.getName(), e);
325             }
326         }
327
328         // Stop event production
329         consumer.stop();
330     }
331
332     /**
333      * Get the unmarshaler thread.
334      *
335      * @return the unmarshaler thread
336      */
337     public Thread getThread() {
338         return unmarshallerThread;
339     }
340
341     /**
342      * Stop the Apex event unmarshaller's event producer using its termination mechanism.
343      */
344     public void stop() {
345         LOGGER.entry("shutting down Apex event unmarshaller . . .");
346
347         // Order the stop
348         stopOrderedFlag = true;
349
350         // Wait for thread shutdown
351         while (unmarshallerThread != null && unmarshallerThread.isAlive()) {
352             ThreadUtilities.sleep(UNMARSHALLER_SHUTDOWN_WAIT_INTERVAL);
353         }
354
355         // Order a stop on the synchronous cache if one exists
356         if (consumerParameters != null && consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS)
357             && consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS) != null) {
358             ((SynchronousEventCache) consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS)).stop();
359         }
360         LOGGER.exit("shut down Apex event unmarshaller");
361     }
362 }