2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * Modifications Copyright (C) 2019-2021 Nordix Foundation.
5 * Modifications Copyright (C) 2020-2022 Bell Canada. All rights reserved.
6 * Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
20 * SPDX-License-Identifier: Apache-2.0
21 * ============LICENSE_END=========================================================
24 package org.onap.policy.apex.service.engine.main;
26 import com.google.common.base.Strings;
27 import io.prometheus.client.Counter;
28 import java.util.Arrays;
29 import java.util.Collections;
30 import java.util.Iterator;
31 import java.util.List;
32 import java.util.Properties;
33 import java.util.concurrent.BlockingQueue;
34 import java.util.concurrent.LinkedBlockingQueue;
35 import java.util.concurrent.TimeUnit;
37 import lombok.NonNull;
38 import org.apache.commons.lang3.EnumUtils;
39 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
40 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
41 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
42 import org.onap.policy.apex.model.basicmodel.concepts.AxToscaPolicyProcessingStatus;
43 import org.onap.policy.apex.service.engine.event.ApexEvent;
44 import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
45 import org.onap.policy.apex.service.engine.event.ApexEventException;
46 import org.onap.policy.apex.service.engine.event.ApexEventProtocolConverter;
47 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
48 import org.onap.policy.apex.service.engine.event.PeeredReference;
49 import org.onap.policy.apex.service.engine.event.SynchronousEventCache;
50 import org.onap.policy.apex.service.engine.event.impl.EventConsumerFactory;
51 import org.onap.policy.apex.service.engine.event.impl.EventProtocolFactory;
52 import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters;
53 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
54 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
55 import org.onap.policy.common.utils.resources.PrometheusUtils;
56 import org.onap.policy.models.pdp.concepts.PdpStatus;
57 import org.onap.policy.models.pdp.enums.PdpResponseStatus;
58 import org.slf4j.ext.XLogger;
59 import org.slf4j.ext.XLoggerFactory;
62 * This event unmarshaler handles events coming into Apex, handles threading, event queuing, transformation and
63 * receiving using the configured receiving technology.
65 * @author Liam Fallon (liam.fallon@ericsson.com)
67 public class ApexEventUnmarshaller implements ApexEventReceiver, Runnable {
68 // Get a reference to the logger
69 private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexEventUnmarshaller.class);
71 // Interval to wait between thread shutdown checks
72 private static final int UNMARSHALLER_SHUTDOWN_WAIT_INTERVAL = 10;
74 // The amount of time to wait between polls of the event queue in milliseconds
75 private static final long EVENT_QUEUE_POLL_INTERVAL = 20;
77 private static final String PROMETHEUS_TOTAL_LABEL_VALUE = "TOTAL";
79 // prometheus registration for policy execution metrics
80 static final Counter POLICY_EXECUTED_COUNTER =
81 Counter.build().namespace(PrometheusUtils.PdpType.PDPA.getNamespace())
82 .name(PrometheusUtils.POLICY_EXECUTION_METRIC).labelNames(PrometheusUtils.STATUS_METRIC_LABEL)
83 .help(PrometheusUtils.POLICY_EXECUTION_HELP).register();
85 // The name of the unmarshaler
87 private final String name;
89 // The engine service and consumer parameters
90 private final EngineServiceParameters engineServiceParameters;
91 private final EventHandlerParameters consumerParameters;
93 // The engine service handler to use for forwarding on of unmarshalled events
94 private ApexEngineServiceHandler engineServiceHandler;
96 // Apex event producer and event converter, all events are sent as string representations
98 private ApexEventConsumer consumer;
100 private ApexEventProtocolConverter converter;
102 // Temporary event holder for events going into Apex
103 private final BlockingQueue<ApexEvent> queue = new LinkedBlockingQueue<>();
105 // The unmarshaler thread and stopping flag
106 private Thread unmarshallerThread = null;
107 private boolean stopOrderedFlag = false;
110 * Create the unmarshaler.
112 * @param name the name of the unmarshaler
113 * @param engineServiceParameters the engine service parameters for this Apex engine
114 * @param consumerParameters the consumer parameters for this specific unmarshaler
116 public ApexEventUnmarshaller(final String name, final EngineServiceParameters engineServiceParameters,
117 final EventHandlerParameters consumerParameters) {
119 this.engineServiceParameters = engineServiceParameters;
120 this.consumerParameters = consumerParameters;
124 * Configure the consumer and initialize the thread for event sending.
126 * @param incomingEngineServiceHandler the Apex engine service handler for passing events to Apex
127 * @throws ApexEventException on errors initializing event handling
129 public void init(final ApexEngineServiceHandler incomingEngineServiceHandler) throws ApexEventException {
130 this.engineServiceHandler = incomingEngineServiceHandler;
132 // Create the consumer for sending events and the converter for transforming events
133 consumer = new EventConsumerFactory().createConsumer(name, consumerParameters);
134 consumer.init(this.name, this.consumerParameters, this);
136 converter = new EventProtocolFactory().createConverter(name, consumerParameters.getEventProtocolParameters());
140 * Start the unmarshaler and consumer threads.
142 public void start() {
143 // Start the consumer
146 // Configure and start the event reception thread
147 final String threadName =
148 engineServiceParameters.getEngineKey().getName() + ":" + this.getClass().getName() + ":" + name;
149 unmarshallerThread = new ApplicationThreadFactory(threadName).newThread(this);
150 unmarshallerThread.setDaemon(true);
151 unmarshallerThread.start();
155 * Connect a synchronous unmarshaler with a synchronous marshaler.
157 * @param peeredMode the peered mode under which the unmarshaler and marshaler are connected
158 * @param peeredMarshaller the synchronous marshaler to connect with
160 public void connectMarshaler(final EventHandlerPeeredMode peeredMode, final ApexEventMarshaller peeredMarshaller) {
161 switch (peeredMode) {
163 // To connect a synchronous unmarshaler and marshaler, we create a synchronous event
164 // cache on the consumer/producer pair
165 new SynchronousEventCache(peeredMode, consumer, peeredMarshaller.getProducer(),
166 consumerParameters.getPeerTimeout(EventHandlerPeeredMode.SYNCHRONOUS));
170 new PeeredReference(peeredMode, consumer, peeredMarshaller.getProducer());
182 public void receiveEvent(@NonNull final Properties executionProperties, final Object event)
183 throws ApexEventException {
184 receiveEvent(0, executionProperties, event, true);
191 public void receiveEvent(final long executionId, @NonNull final Properties executionProperties, final Object event)
192 throws ApexEventException {
193 receiveEvent(executionId, executionProperties, event, false);
197 * Receive an event from a consumer, convert its protocol and forward it to Apex.
199 * @param executionId the execution id the incoming execution ID
200 * @param executionProperties properties used during processing of this event
201 * @param event the event in its native format
202 * @param generateExecutionId if true, let Apex generate the execution ID, if false, use the incoming execution ID
203 * @throws ApexEventException on unmarshaling errors on events
205 private void receiveEvent(final long executionId, final Properties executionProperties, final Object event,
206 final boolean generateExecutionId) throws ApexEventException {
207 // Push the event onto the queue
208 if (LOGGER.isTraceEnabled()) {
209 var eventString = "onMessage(): event received: " + event.toString();
210 LOGGER.trace(eventString);
213 // Convert the incoming events to Apex events
214 List<ApexEvent> apexEventList = convertToApexEvents(event);
216 for (final ApexEvent apexEvent : apexEventList) {
217 // Check if this event is filtered out by the incoming filter
218 if (isEventFilteredOut(apexEvent)) {
222 if (!generateExecutionId) {
223 apexEvent.setExecutionId(executionId);
224 apexEvent.setExecutionProperties(executionProperties);
226 // Clean up executionProperties in case if it is not a response event to a request made from APEX
227 apexEvent.setExecutionProperties(new Properties(executionProperties));
229 // Cache synchronized events that are sent
230 if (consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS)) {
231 final var synchronousEventCache =
232 (SynchronousEventCache) consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS);
233 synchronousEventCache.cacheSynchronizedEventToApex(apexEvent.getExecutionId(), apexEvent);
236 // Update policy execution metrics
237 updatePolicyExecutedMetrics(apexEvent.getToscaPolicyState());
240 queue.add(apexEvent);
245 * Increment Prometheus counters for TOSCA policy execution metrics.
247 * @param toscaPolicyState the TOSCA Policy state flag from the event
249 private void updatePolicyExecutedMetrics(String toscaPolicyState) {
250 // Skip events that are not flagged as TOSCA processing entry or exit points.
251 if (Strings.isNullOrEmpty(toscaPolicyState)
252 || !EnumUtils.isValidEnum(AxToscaPolicyProcessingStatus.class, toscaPolicyState)) {
256 // Increment total, successful and failed policy executed counter.
257 if (AxToscaPolicyProcessingStatus.ENTRY.name().equals(toscaPolicyState)) {
258 POLICY_EXECUTED_COUNTER.labels(PROMETHEUS_TOTAL_LABEL_VALUE).inc();
259 } else if (AxToscaPolicyProcessingStatus.EXIT_SUCCESS.name().equals(toscaPolicyState)) {
260 POLICY_EXECUTED_COUNTER.labels(PdpResponseStatus.SUCCESS.name()).inc();
261 } else if (AxToscaPolicyProcessingStatus.EXIT_FAILURE.name().equals(toscaPolicyState)) {
262 POLICY_EXECUTED_COUNTER.labels(PdpResponseStatus.FAIL.name()).inc();
266 private List<ApexEvent> convertToApexEvents(final Object event) throws ApexEventException {
267 List<ApexEvent> apexEventList = null;
268 List<String> eventNamesList = null;
269 if (consumerParameters.getEventName() != null) {
270 eventNamesList = Arrays.asList(consumerParameters.getEventName().split("\\|"));
272 eventNamesList = Collections.singletonList(null);
274 Iterator<String> iterator = eventNamesList.iterator();
275 // Incoming events in an endpoint can have different structure , for e.g., success/failure response events
276 // Parse the incoming event into an APEX event defined with any one of the names specified in eventName field
277 while (iterator.hasNext()) {
279 String eventName = iterator.next();
280 apexEventList = converter.toApexEvent(eventName, event);
282 } catch (ApexException e) {
283 if (!iterator.hasNext()) {
284 final String errorMessage = "Error while converting event into an ApexEvent for " + name;
285 if (!LOGGER.isDebugEnabled()) {
286 LOGGER.warn("{}. Detailed logs are available at debug level.", errorMessage);
288 throw new ApexEventException(errorMessage, e);
292 return apexEventList;
296 * Check if an event is filtered out and ignored.
298 * @param apexEvent the event to check
300 private boolean isEventFilteredOut(final ApexEvent apexEvent) {
301 // Check if we are filtering events on this unmarshaler, if so check the event name
302 // against the filter
303 if (consumerParameters.isSetEventNameFilter()
304 && !apexEvent.getName().matches(consumerParameters.getEventNameFilter())) {
306 if (LOGGER.isTraceEnabled()) {
307 LOGGER.trace("onMessage(): event {} not processed, filtered out by filter", apexEvent,
308 consumerParameters.getEventNameFilter());
318 * Run a thread that runs forever (well until system termination anyway) and listens for incoming events on the
323 // Run until interruption
324 while (unmarshallerThread.isAlive() && !stopOrderedFlag) {
326 // Take the next event from the queue
327 final var apexEvent = queue.poll(EVENT_QUEUE_POLL_INTERVAL, TimeUnit.MILLISECONDS);
328 if (apexEvent == null) {
332 if (LOGGER.isTraceEnabled()) {
333 var message = apexEvent.toString();
334 LOGGER.trace("event received {}", message);
337 // Pass the event to the activator for forwarding to Apex
338 engineServiceHandler.forwardEvent(apexEvent);
339 } catch (final InterruptedException e) {
340 // restore the interrupt status
341 Thread.currentThread().interrupt();
342 LOGGER.warn("BatchProcessor thread interrupted, Reason {}", e.getMessage());
343 stopOrderedFlag = true;
344 } catch (final Exception e) {
345 LOGGER.warn("Error while forwarding events for " + unmarshallerThread.getName(), e);
349 // Stop event production
354 * Get the unmarshaler thread.
356 * @return the unmarshaler thread
358 public Thread getThread() {
359 return unmarshallerThread;
363 * Stop the Apex event unmarshaller's event producer using its termination mechanism.
366 LOGGER.entry("shutting down Apex event unmarshaller . . .");
369 stopOrderedFlag = true;
371 // Wait for thread shutdown
372 while (unmarshallerThread != null && unmarshallerThread.isAlive()) {
373 ThreadUtilities.sleep(UNMARSHALLER_SHUTDOWN_WAIT_INTERVAL);
376 // Order a stop on the synchronous cache if one exists
377 if (consumerParameters != null && consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS)
378 && consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS) != null) {
379 ((SynchronousEventCache) consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS)).stop();
381 LOGGER.exit("shut down Apex event unmarshaller");