2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * Modifications Copyright (C) 2020 Nordix Foundation.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * SPDX-License-Identifier: Apache-2.0
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.apex.service.engine.event;
24 import java.util.AbstractMap.SimpleEntry;
25 import java.util.HashMap;
26 import java.util.HashSet;
28 import java.util.Map.Entry;
30 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
31 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
32 import org.slf4j.ext.XLogger;
33 import org.slf4j.ext.XLoggerFactory;
36 * This class holds a cache of the synchronous events sent into Apex and that have not yet been replied to. It runs a
37 * thread to time out events that have not been replied to in the specified timeout.
39 * @author Liam Fallon (liam.fallon@ericsson.com)
41 public class SynchronousEventCache extends PeeredReference implements Runnable {
42 // Get a reference to the logger
43 private static final XLogger LOGGER = XLoggerFactory.getXLogger(SynchronousEventCache.class);
45 // The default amount of time to wait for a synchronous event to be replied to is 1 second
46 private static final long DEFAULT_SYNCHRONOUS_EVENT_TIMEOUT = 1000;
48 // The timeout to wait between event polls in milliseconds and the time to wait for the thread
50 private static final long OUTSTANDING_EVENT_POLL_TIMEOUT = 50;
51 private static final long CACHE_STOP_WAIT_INTERVAL = 10;
53 // The time in milliseconds to wait for the reply to a sent synchronous event
54 private long synchronousEventTimeout = DEFAULT_SYNCHRONOUS_EVENT_TIMEOUT;
56 // Map holding outstanding synchronous events
57 private final Map<Long, SimpleEntry<Long, Object>> toApexEventMap = new HashMap<>();
59 // Map holding reply events
60 private final Map<Long, SimpleEntry<Long, Object>> fromApexEventMap = new HashMap<>();
62 // The message listener thread and stopping flag
63 private final Thread synchronousEventCacheThread;
64 private boolean stopOrderedFlag = false;
67 * Create a synchronous event cache that caches outstanding synchronous Apex events.
69 * @param peeredMode the peered mode for which to return the reference
70 * @param consumer the consumer that is populating the cache
71 * @param producer the producer that is emptying the cache
72 * @param synchronousEventTimeout the time in milliseconds to wait for the reply to a sent synchronous event
74 public SynchronousEventCache(final EventHandlerPeeredMode peeredMode, final ApexEventConsumer consumer,
75 final ApexEventProducer producer, final long synchronousEventTimeout) {
76 super(peeredMode, consumer, producer);
78 if (synchronousEventTimeout != 0) {
79 this.synchronousEventTimeout = synchronousEventTimeout;
81 this.synchronousEventTimeout = DEFAULT_SYNCHRONOUS_EVENT_TIMEOUT;
84 // Start scanning the outstanding events
85 synchronousEventCacheThread = new Thread(this);
86 synchronousEventCacheThread.setDaemon(true);
87 synchronousEventCacheThread.start();
91 * Gets the timeout value for synchronous events.
93 * @return the synchronous event timeout
95 public long getSynchronousEventTimeout() {
96 return synchronousEventTimeout;
100 * Cache a synchronized event sent into Apex in the event cache.
102 * @param executionId the execution ID that was assigned to the event
103 * @param event the apex event
105 public void cacheSynchronizedEventToApex(final long executionId, final Object event) {
106 // Add the event to the map
107 synchronized (toApexEventMap) {
108 cacheSynchronizedEvent(toApexEventMap, executionId, event);
113 * Remove the record of an event sent to Apex if it exists in the cache.
115 * @param executionId the execution ID of the event
116 * @return The removed event
118 public Object removeCachedEventToApexIfExists(final long executionId) {
119 synchronized (toApexEventMap) {
120 return removeCachedEventIfExists(toApexEventMap, executionId);
125 * Check if an event exists in the to apex cache.
127 * @param executionId the execution ID of the event
128 * @return true if the event exists, false otherwise
130 public boolean existsEventToApex(final long executionId) {
131 synchronized (toApexEventMap) {
132 return toApexEventMap.containsKey(executionId);
137 * Cache synchronized event received from Apex in the event cache.
139 * @param executionId the execution ID of the event
140 * @param event the apex event
142 public void cacheSynchronizedEventFromApex(final long executionId, final Object event) {
143 // Add the event to the map
144 synchronized (fromApexEventMap) {
145 cacheSynchronizedEvent(fromApexEventMap, executionId, event);
150 * Remove the record of an event received from Apex if it exists in the cache.
152 * @param executionId the execution ID of the event
153 * @return The removed event
155 public Object removeCachedEventFromApexIfExists(final long executionId) {
156 synchronized (fromApexEventMap) {
157 return removeCachedEventIfExists(fromApexEventMap, executionId);
162 * Check if an event exists in the from apex cache.
164 * @param executionId the execution ID of the event
165 * @return true if the event exists, false otherwise
167 public boolean existsEventFromApex(final long executionId) {
168 synchronized (fromApexEventMap) {
169 return fromApexEventMap.containsKey(executionId);
180 // Periodic scan of outstanding events
181 while (synchronousEventCacheThread.isAlive() && !stopOrderedFlag) {
182 ThreadUtilities.sleep(OUTSTANDING_EVENT_POLL_TIMEOUT);
184 // Check for timeouts on events
185 synchronized (toApexEventMap) {
186 timeoutEventsOnCache(toApexEventMap);
188 synchronized (fromApexEventMap) {
189 timeoutEventsOnCache(fromApexEventMap);
197 * Stops the scanning thread and clears the cache.
199 public synchronized void stop() {
201 stopOrderedFlag = true;
203 while (synchronousEventCacheThread.isAlive()) {
204 ThreadUtilities.sleep(CACHE_STOP_WAIT_INTERVAL);
207 // Check if there are any unprocessed events
208 if (!toApexEventMap.isEmpty()) {
209 String message = toApexEventMap.size() + " synchronous events dropped due to system shutdown";
210 LOGGER.warn(message);
213 toApexEventMap.clear();
218 * Cache a synchronized event sent in an event cache.
220 * @param eventCacheMap the map to cache the event on
221 * @param executionId the execution ID of the event
222 * @param event the event to cache
224 private void cacheSynchronizedEvent(final Map<Long, SimpleEntry<Long, Object>> eventCacheMap,
225 final long executionId, final Object event) {
226 LOGGER.entry("Adding event with execution ID: " + executionId);
228 // Check if the event is already in the cache
229 if (eventCacheMap.containsKey(executionId)) {
230 // If there was no sent event then the event timed out or some unexpected event was
232 final String errorMessage = "an event with ID " + executionId
233 + " already exists in the synchronous event cache, "
234 + "execution IDs must be unique in the system";
235 LOGGER.warn(errorMessage);
236 throw new ApexEventRuntimeException(errorMessage);
239 // Add the event to the map
240 eventCacheMap.put(executionId, new SimpleEntry<>(System.currentTimeMillis(), event));
242 if (LOGGER.isDebugEnabled()) {
243 String message = "event has been cached:" + event;
244 LOGGER.debug(message);
247 LOGGER.exit("Added: " + executionId);
251 * Remove the record of an event if it exists in the cache.
253 * @param eventCacheMap the map to remove the event from
254 * @param executionId the execution ID of the event
255 * @return The removed event
257 private Object removeCachedEventIfExists(final Map<Long, SimpleEntry<Long, Object>> eventCacheMap,
258 final long executionId) {
259 LOGGER.entry("Removing: " + executionId);
261 final SimpleEntry<Long, Object> removedEventEntry = eventCacheMap.remove(executionId);
263 if (removedEventEntry != null) {
264 LOGGER.exit("Removed: " + executionId);
265 return removedEventEntry.getValue();
267 // The event may not be one of the events in our cache, so we just ignore removal
274 * Time out events on an event cache map. Events that have a timeout longer than the configured timeout are timed
277 * @param eventCacheMap the event cache to operate on
279 private void timeoutEventsOnCache(final Map<Long, SimpleEntry<Long, Object>> eventCacheMap) {
280 // Use a set to keep track of the events that have timed out
281 final Set<Long> timedOutEventSet = new HashSet<>();
283 for (final Entry<Long, SimpleEntry<Long, Object>> cachedEventEntry : eventCacheMap.entrySet()) {
284 // The amount of time we are waiting for the event reply
285 final long eventWaitTime = System.currentTimeMillis() - cachedEventEntry.getValue().getKey();
287 // Have we a timeout?
288 if (eventWaitTime > synchronousEventTimeout) {
289 timedOutEventSet.add(cachedEventEntry.getKey());
293 // Remove timed out events from the map
294 for (final long timedoutEventExecutionId : timedOutEventSet) {
295 // Remove the map entry and issue a warning
296 final SimpleEntry<Long, Object> timedOutEventEntry = eventCacheMap.remove(timedoutEventExecutionId);
298 String message = "synchronous event timed out, reply not received in " + synchronousEventTimeout
299 + " milliseconds on event " + timedOutEventEntry.getValue();
300 LOGGER.warn(message);