Add github workflow to trigger weekly performance tests
[policy/apex-pdp.git] / services / services-engine / src / main / java / org / onap / policy / apex / service / engine / main / ApexEventUnmarshaller.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2019-2022 Nordix Foundation.
5  *  Modifications Copyright (C) 2020-2022 Bell Canada. All rights reserved.
6  *  Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  * SPDX-License-Identifier: Apache-2.0
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.policy.apex.service.engine.main;
25
26 import com.google.common.base.Strings;
27 import io.prometheus.client.Counter;
28 import java.util.Arrays;
29 import java.util.Collections;
30 import java.util.Iterator;
31 import java.util.List;
32 import java.util.Properties;
33 import java.util.concurrent.BlockingQueue;
34 import java.util.concurrent.LinkedBlockingQueue;
35 import java.util.concurrent.TimeUnit;
36 import lombok.Getter;
37 import lombok.NonNull;
38 import org.apache.commons.lang3.EnumUtils;
39 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
40 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
41 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
42 import org.onap.policy.apex.model.basicmodel.concepts.AxToscaPolicyProcessingStatus;
43 import org.onap.policy.apex.service.engine.event.ApexEvent;
44 import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
45 import org.onap.policy.apex.service.engine.event.ApexEventException;
46 import org.onap.policy.apex.service.engine.event.ApexEventProtocolConverter;
47 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
48 import org.onap.policy.apex.service.engine.event.PeeredReference;
49 import org.onap.policy.apex.service.engine.event.SynchronousEventCache;
50 import org.onap.policy.apex.service.engine.event.impl.EventConsumerFactory;
51 import org.onap.policy.apex.service.engine.event.impl.EventProtocolFactory;
52 import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters;
53 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
54 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
55 import org.onap.policy.common.utils.resources.PrometheusUtils;
56 import org.onap.policy.models.pdp.enums.PdpResponseStatus;
57 import org.slf4j.ext.XLogger;
58 import org.slf4j.ext.XLoggerFactory;
59
60 /**
61  * This event unmarshaler handles events coming into Apex, handles threading, event queuing, transformation and
62  * receiving using the configured receiving technology.
63  *
64  * @author Liam Fallon (liam.fallon@ericsson.com)
65  */
66 public class ApexEventUnmarshaller implements ApexEventReceiver, Runnable {
67     // Get a reference to the logger
68     private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexEventUnmarshaller.class);
69
70     // Interval to wait between thread shutdown checks
71     private static final int UNMARSHALLER_SHUTDOWN_WAIT_INTERVAL = 10;
72
73     // The amount of time to wait between polls of the event queue in milliseconds
74     private static final long EVENT_QUEUE_POLL_INTERVAL = 20;
75
76     private static final String PROMETHEUS_TOTAL_LABEL_VALUE = "TOTAL";
77
78     // prometheus registration for policy execution metrics
79     static final Counter POLICY_EXECUTED_COUNTER =
80         Counter.build().namespace(PrometheusUtils.PdpType.PDPA.getNamespace())
81             .name(PrometheusUtils.POLICY_EXECUTION_METRIC).labelNames(PrometheusUtils.STATUS_METRIC_LABEL)
82             .help(PrometheusUtils.POLICY_EXECUTION_HELP).register();
83
84     // The name of the unmarshaler
85     @Getter
86     private final String name;
87
88     // The engine service and consumer parameters
89     private final EngineServiceParameters engineServiceParameters;
90     private final EventHandlerParameters consumerParameters;
91
92     // The engine service handler to use for forwarding on of unmarshalled events
93     private ApexEngineServiceHandler engineServiceHandler;
94
95     // Apex event producer and event converter, all events are sent as string representations
96     @Getter
97     private ApexEventConsumer consumer;
98     @Getter
99     private ApexEventProtocolConverter converter;
100
101     // Temporary event holder for events going into Apex
102     private final BlockingQueue<ApexEvent> queue = new LinkedBlockingQueue<>();
103
104     // The unmarshaler thread and stopping flag
105     private Thread unmarshallerThread = null;
106     private boolean stopOrderedFlag = false;
107
108     /**
109      * Create the unmarshaler.
110      *
111      * @param name the name of the unmarshaler
112      * @param engineServiceParameters the engine service parameters for this Apex engine
113      * @param consumerParameters the consumer parameters for this specific unmarshaler
114      */
115     public ApexEventUnmarshaller(final String name, final EngineServiceParameters engineServiceParameters,
116         final EventHandlerParameters consumerParameters) {
117         this.name = name;
118         this.engineServiceParameters = engineServiceParameters;
119         this.consumerParameters = consumerParameters;
120     }
121
122     /**
123      * Configure the consumer and initialize the thread for event sending.
124      *
125      * @param incomingEngineServiceHandler the Apex engine service handler for passing events to Apex
126      * @throws ApexEventException on errors initializing event handling
127      */
128     public void init(final ApexEngineServiceHandler incomingEngineServiceHandler) throws ApexEventException {
129         this.engineServiceHandler = incomingEngineServiceHandler;
130
131         // Create the consumer for sending events and the converter for transforming events
132         consumer = new EventConsumerFactory().createConsumer(name, consumerParameters);
133         consumer.init(this.name, this.consumerParameters, this);
134
135         converter = new EventProtocolFactory().createConverter(name, consumerParameters.getEventProtocolParameters());
136     }
137
138     /**
139      * Start the unmarshaler and consumer threads.
140      */
141     public void start() {
142         // Start the consumer
143         consumer.start();
144
145         // Configure and start the event reception thread
146         final String threadName =
147             engineServiceParameters.getEngineKey().getName() + ":" + this.getClass().getName() + ":" + name;
148         unmarshallerThread = new ApplicationThreadFactory(threadName).newThread(this);
149         unmarshallerThread.setDaemon(true);
150         unmarshallerThread.start();
151     }
152
153     /**
154      * Connect a synchronous unmarshaler with a synchronous marshaler.
155      *
156      * @param peeredMode the peered mode under which the unmarshaler and marshaler are connected
157      * @param peeredMarshaller the synchronous marshaler to connect with
158      */
159     public void connectMarshaler(final EventHandlerPeeredMode peeredMode, final ApexEventMarshaller peeredMarshaller) {
160         switch (peeredMode) {
161             case SYNCHRONOUS:
162                 // To connect a synchronous unmarshaler and marshaler, we create a synchronous event
163                 // cache on the consumer/producer pair
164                 new SynchronousEventCache(peeredMode, consumer, peeredMarshaller.getProducer(),
165                     consumerParameters.getPeerTimeout(EventHandlerPeeredMode.SYNCHRONOUS));
166                 return;
167
168             case REQUESTOR:
169                 new PeeredReference(peeredMode, consumer, peeredMarshaller.getProducer());
170                 return;
171
172             default:
173                 return;
174         }
175     }
176
177     /**
178      * {@inheritDoc}.
179      */
180     @Override
181     public void receiveEvent(@NonNull final Properties executionProperties, final Object event)
182         throws ApexEventException {
183         receiveEvent(0, executionProperties, event, true);
184     }
185
186     /**
187      * {@inheritDoc}.
188      */
189     @Override
190     public void receiveEvent(final long executionId, @NonNull final Properties executionProperties, final Object event)
191         throws ApexEventException {
192         receiveEvent(executionId, executionProperties, event, false);
193     }
194
195     /**
196      * Receive an event from a consumer, convert its protocol and forward it to Apex.
197      *
198      * @param executionId the execution id the incoming execution ID
199      * @param executionProperties properties used during processing of this event
200      * @param event the event in its native format
201      * @param generateExecutionId if true, let Apex generate the execution ID, if false, use the incoming execution ID
202      * @throws ApexEventException on unmarshaling errors on events
203      */
204     private void receiveEvent(final long executionId, final Properties executionProperties, final Object event,
205         final boolean generateExecutionId) throws ApexEventException {
206         // Push the event onto the queue
207         if (LOGGER.isTraceEnabled()) {
208             var eventString = "onMessage(): event received: " + event.toString();
209             LOGGER.trace(eventString);
210         }
211
212         // Convert the incoming events to Apex events
213         List<ApexEvent> apexEventList = convertToApexEvents(event);
214
215         for (final ApexEvent apexEvent : apexEventList) {
216             // Check if this event is filtered out by the incoming filter
217             if (isEventFilteredOut(apexEvent)) {
218                 // Ignore this event
219                 continue;
220             }
221             if (!generateExecutionId) {
222                 apexEvent.setExecutionId(executionId);
223                 apexEvent.setExecutionProperties(executionProperties);
224             } else {
225                 // Clean up executionProperties in case if it is not a response event to a request made from APEX
226                 apexEvent.setExecutionProperties(new Properties(executionProperties));
227             }
228             // Cache synchronized events that are sent
229             if (consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS)) {
230                 final var synchronousEventCache =
231                     (SynchronousEventCache) consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS);
232                 synchronousEventCache.cacheSynchronizedEventToApex(apexEvent.getExecutionId(), apexEvent);
233             }
234
235             // Update policy execution metrics
236             updatePolicyExecutedMetrics(apexEvent.getToscaPolicyState());
237
238             // Enqueue the event
239             queue.add(apexEvent);
240         }
241     }
242
243     /**
244      * Increment Prometheus counters for TOSCA policy execution metrics.
245      *
246      * @param toscaPolicyState the TOSCA Policy state flag from the event
247      */
248     private void updatePolicyExecutedMetrics(String toscaPolicyState) {
249         // Skip events that are not flagged as TOSCA processing entry or exit points.
250         if (Strings.isNullOrEmpty(toscaPolicyState)
251                 || !EnumUtils.isValidEnum(AxToscaPolicyProcessingStatus.class, toscaPolicyState)) {
252             return;
253         }
254
255         // Increment total, successful and failed policy executed counter.
256         if (AxToscaPolicyProcessingStatus.ENTRY.name().equals(toscaPolicyState)) {
257             POLICY_EXECUTED_COUNTER.labels(PROMETHEUS_TOTAL_LABEL_VALUE).inc();
258         } else if (AxToscaPolicyProcessingStatus.EXIT_SUCCESS.name().equals(toscaPolicyState)) {
259             POLICY_EXECUTED_COUNTER.labels(PdpResponseStatus.SUCCESS.name()).inc();
260         } else if (AxToscaPolicyProcessingStatus.EXIT_FAILURE.name().equals(toscaPolicyState)) {
261             POLICY_EXECUTED_COUNTER.labels(PdpResponseStatus.FAIL.name()).inc();
262         }
263     }
264
265     private List<ApexEvent> convertToApexEvents(final Object event) throws ApexEventException {
266         List<ApexEvent> apexEventList = null;
267         List<String> eventNamesList = null;
268         if (consumerParameters.getEventName() != null) {
269             eventNamesList = Arrays.asList(consumerParameters.getEventName().split("\\|"));
270         } else {
271             eventNamesList = Collections.singletonList(null);
272         }
273         Iterator<String> iterator = eventNamesList.iterator();
274         // Incoming events in an endpoint can have different structure , for e.g., success/failure response events
275         // Parse the incoming event into an APEX event defined with any one of the names specified in eventName field
276         while (iterator.hasNext()) {
277             try {
278                 String eventName = iterator.next();
279                 apexEventList = converter.toApexEvent(eventName, event);
280                 break;
281             } catch (ApexException e) {
282                 if (!iterator.hasNext()) {
283                     final String errorMessage = "Error while converting event into an ApexEvent for " + name;
284                     if (!LOGGER.isDebugEnabled()) {
285                         LOGGER.warn("{}. Detailed logs are available at debug level.", errorMessage);
286                     }
287                     throw new ApexEventException(errorMessage, e);
288                 }
289             }
290         }
291         return apexEventList;
292     }
293
294     /**
295      * Check if an event is filtered out and ignored.
296      *
297      * @param apexEvent the event to check
298      */
299     private boolean isEventFilteredOut(final ApexEvent apexEvent) {
300         // Check if we are filtering events on this unmarshaler, if so check the event name
301         // against the filter
302         if (consumerParameters.isSetEventNameFilter()
303             && !apexEvent.getName().matches(consumerParameters.getEventNameFilter())) {
304
305             if (LOGGER.isTraceEnabled()) {
306                 LOGGER.trace("onMessage(): event {} not processed, filtered  out by filter", apexEvent,
307                     consumerParameters.getEventNameFilter());
308             }
309
310             return true;
311         } else {
312             return false;
313         }
314     }
315
316     /**
317      * Run a thread that runs forever (well until system termination anyway) and listens for incoming events on the
318      * queue.
319      */
320     @Override
321     public void run() {
322         // Run until interruption
323         while (unmarshallerThread.isAlive() && !stopOrderedFlag) {
324             try {
325                 // Take the next event from the queue
326                 final var apexEvent = queue.poll(EVENT_QUEUE_POLL_INTERVAL, TimeUnit.MILLISECONDS);
327                 if (apexEvent == null) {
328                     continue;
329                 }
330
331                 if (LOGGER.isTraceEnabled()) {
332                     var message = apexEvent.toString();
333                     LOGGER.trace("event received {}", message);
334                 }
335
336                 // Pass the event to the activator for forwarding to Apex
337                 engineServiceHandler.forwardEvent(apexEvent);
338             } catch (final InterruptedException e) {
339                 // restore the interrupt status
340                 Thread.currentThread().interrupt();
341                 LOGGER.warn("BatchProcessor thread interrupted, Reason {}", e.getMessage());
342                 stopOrderedFlag = true;
343             } catch (final Exception e) {
344                 LOGGER.warn("Error while forwarding events for " + unmarshallerThread.getName(), e);
345             }
346         }
347
348         // Stop event production
349         consumer.stop();
350     }
351
352     /**
353      * Get the unmarshaler thread.
354      *
355      * @return the unmarshaler thread
356      */
357     public Thread getThread() {
358         return unmarshallerThread;
359     }
360
361     /**
362      * Stop the Apex event unmarshaller's event producer using its termination mechanism.
363      */
364     public void stop() {
365         LOGGER.entry("shutting down Apex event unmarshaller . . .");
366
367         // Order the stop
368         stopOrderedFlag = true;
369
370         // Wait for thread shutdown
371         while (unmarshallerThread != null && unmarshallerThread.isAlive()) {
372             ThreadUtilities.sleep(UNMARSHALLER_SHUTDOWN_WAIT_INTERVAL);
373         }
374
375         // Order a stop on the synchronous cache if one exists
376         if (consumerParameters != null && consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS)
377             && consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS) != null) {
378             ((SynchronousEventCache) consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS)).stop();
379         }
380         LOGGER.exit("shut down Apex event unmarshaller");
381     }
382 }