Changes for checkstyle 8.32
[policy/apex-pdp.git] / services / services-engine / src / main / java / org / onap / policy / apex / service / engine / main / ApexEventMarshaller.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.service.engine.main;
22
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.TimeUnit;
26 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
27 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
28 import org.onap.policy.apex.service.engine.event.ApexEvent;
29 import org.onap.policy.apex.service.engine.event.ApexEventException;
30 import org.onap.policy.apex.service.engine.event.ApexEventProducer;
31 import org.onap.policy.apex.service.engine.event.ApexEventProtocolConverter;
32 import org.onap.policy.apex.service.engine.event.impl.EventProducerFactory;
33 import org.onap.policy.apex.service.engine.event.impl.EventProtocolFactory;
34 import org.onap.policy.apex.service.engine.runtime.ApexEventListener;
35 import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters;
36 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
37 import org.slf4j.ext.XLogger;
38 import org.slf4j.ext.XLoggerFactory;
39
40 /**
41  * This event marshaler handles events coming out of Apex and sends them on, handles threading, event queuing,
42  * transformations and sending using the configured sending technology.
43  *
44  * @author Liam Fallon (liam.fallon@ericsson.com)
45  */
46 public class ApexEventMarshaller implements ApexEventListener, Runnable {
47     // Get a reference to the logger
48     private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexEventMarshaller.class);
49
50     // Interval to wait between thread shutdown checks
51     private static final int MARSHALLER_SHUTDOWN_WAIT_INTERVAL = 10;
52
53     // The amount of time to wait between polls of the event queue in milliseconds
54     private static final long EVENT_QUEUE_POLL_INTERVAL = 20;
55
56     // The name of the marshaler
57     private final String name;
58
59     // The engine service and producer parameters
60     private final EngineServiceParameters engineServiceParameters;
61     private final EventHandlerParameters producerParameters;
62
63     // Apex event producer and event converter, all conversions are to and from string
64     // representation of events
65     private ApexEventProducer producer;
66     private ApexEventProtocolConverter converter;
67
68     // Temporary event holder for events coming out of Apex
69     private final BlockingQueue<ApexEvent> queue = new LinkedBlockingQueue<>();
70
71     // The marshaler thread and stopping flag
72     private Thread marshallerThread;
73     private boolean stopOrderedFlag = false;
74
75     /**
76      * Create the marshaler.
77      *
78      * @param name the name of the marshaler
79      * @param engineServiceParameters the engine service parameters for this Apex engine
80      * @param producerParameters the producer parameters for this specific marshaler
81      */
82     public ApexEventMarshaller(final String name, final EngineServiceParameters engineServiceParameters,
83             final EventHandlerParameters producerParameters) {
84         this.name = name;
85         this.engineServiceParameters = engineServiceParameters;
86         this.producerParameters = producerParameters;
87     }
88
89     /**
90      * Configure the marshaler by setting up the producer and event converter and initialize the thread for event
91      * sending.
92      *
93      * @throws ApexEventException on errors initializing event handling
94      */
95     public void init() throws ApexEventException {
96         // Create the producer for sending events and the converter for transforming events
97         producer = new EventProducerFactory().createProducer(name, producerParameters);
98
99         // Initialize the producer
100         producer.init(this.name, this.producerParameters);
101
102         // Create the converter for transforming events
103         converter = new EventProtocolFactory().createConverter(name, producerParameters.getEventProtocolParameters());
104
105         // Configure and start the event sending thread
106         final String threadName =
107                 engineServiceParameters.getEngineKey().getName() + ':' + this.getClass().getName() + ':' + this.name;
108         marshallerThread = new ApplicationThreadFactory(threadName).newThread(this);
109         marshallerThread.setDaemon(true);
110         marshallerThread.start();
111     }
112
113     /**
114      * Gets the name of the marshaler.
115      *
116      * @return the marshaler name
117      */
118     public String getName() {
119         return name;
120     }
121
122     /**
123      * Gets the technology specific producer for this marshaler.
124      *
125      * @return the producer
126      */
127     public ApexEventProducer getProducer() {
128         return producer;
129     }
130
131     /**
132      * Gets the event protocol converter for this marshaler.
133      *
134      * @return the event protocol converter
135      */
136     public ApexEventProtocolConverter getConverter() {
137         return converter;
138     }
139
140     /**
141      * Callback method called on implementations of this interface when Apex emits an event.
142      *
143      * @param apexEvent the apex event emitted by Apex
144      */
145     @Override
146     public void onApexEvent(final ApexEvent apexEvent) {
147         // Check if we are filtering events on this marshaler, if so check the event name against
148         // the filter
149         if (producerParameters.isSetEventNameFilter()
150                 && !apexEvent.getName().matches(producerParameters.getEventNameFilter())) {
151             if (LOGGER.isTraceEnabled()) {
152                 LOGGER.trace("onMessage(): event {} not processed, filtered  out by filter", apexEvent,
153                         producerParameters.getEventNameFilter());
154             }
155
156             // Ignore this event
157             return;
158         }
159
160         // Push the event onto the queue for handling
161         try {
162             queue.put(apexEvent);
163         } catch (final InterruptedException e) {
164             // restore the interrupt status
165             Thread.currentThread().interrupt();
166             LOGGER.warn("Queueing of the event was interrupted: " + apexEvent, e);
167         }
168     }
169
170     /**
171      * Run a thread that runs forever (well until system termination anyway) and listens for outgoing events on the
172      * queue.
173      */
174     @Override
175     public void run() {
176         // Run until interrupted
177         while (marshallerThread.isAlive() && !stopOrderedFlag) {
178             try {
179                 // Take the next event from the queue
180                 final ApexEvent apexEvent = queue.poll(EVENT_QUEUE_POLL_INTERVAL, TimeUnit.MILLISECONDS);
181                 if (apexEvent == null) {
182                     continue;
183                 }
184
185                 // Process the next Apex event from the queue
186                 final Object event = converter.fromApexEvent(apexEvent);
187
188                 producer.sendEvent(apexEvent.getExecutionId(), apexEvent.getExecutionProperties(), apexEvent.getName(),
189                         event);
190
191                 if (LOGGER.isTraceEnabled()) {
192                     final String message = "event sent : " + apexEvent.toString();
193                     LOGGER.trace(message);
194                 }
195             } catch (final InterruptedException e) {
196                 // restore the interrupt status
197                 Thread.currentThread().interrupt();
198                 LOGGER.debug("Thread interrupted, Reason {}", e.getMessage());
199                 stopOrderedFlag = true;
200             } catch (final Exception e) {
201                 LOGGER.warn("Error while forwarding events for " + marshallerThread.getName(), e);
202             }
203         }
204
205         // Stop event production if we are not synchronized,;in the synchronized case, the producer
206         // takes care of its own cleanup.
207         producer.stop();
208     }
209
210     /**
211      * Get the marshaler thread.
212      *
213      * @return the marshaler thread
214      */
215     public Thread getThread() {
216         return marshallerThread;
217     }
218
219     /**
220      * Stop the Apex event marshaller's event producer using its termination mechanism.
221      */
222     public void stop() {
223         LOGGER.entry("shutting down Apex event marshaller . . .");
224
225         // Order the stop
226         stopOrderedFlag = true;
227
228         // Wait for thread shutdown
229         while (marshallerThread.isAlive()) {
230             ThreadUtilities.sleep(MARSHALLER_SHUTDOWN_WAIT_INTERVAL);
231         }
232
233         LOGGER.exit("shut down Apex event marshaller");
234     }
235 }