2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.apex.service.engine.event;
23 import java.util.AbstractMap.SimpleEntry;
24 import java.util.HashMap;
25 import java.util.HashSet;
27 import java.util.Map.Entry;
30 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
31 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
32 import org.slf4j.ext.XLogger;
33 import org.slf4j.ext.XLoggerFactory;
36 * This class holds a cache of the synchronous events sent into Apex and that have not yet been
37 * replied to. It runs a thread to time out events that have not been replied to in the specified
40 * @author Liam Fallon (liam.fallon@ericsson.com)
42 public class SynchronousEventCache extends PeeredReference implements Runnable {
43 // Get a reference to the logger
44 private static final XLogger LOGGER = XLoggerFactory.getXLogger(SynchronousEventCache.class);
46 // The default amount of time to wait for a synchronous event to be replied to is 1 second
47 private static final long DEFAULT_SYNCHRONOUS_EVENT_TIMEOUT = 1000;
49 // The timeout to wait between event polls in milliseconds and the time to wait for the thread
51 private static final long OUTSTANDING_EVENT_POLL_TIMEOUT = 50;
52 private static final long CACHE_STOP_WAIT_INTERVAL = 10;
54 // The time in milliseconds to wait for the reply to a sent synchronous event
55 private long synchronousEventTimeout = DEFAULT_SYNCHRONOUS_EVENT_TIMEOUT;
57 // Map holding outstanding synchronous events
58 private final Map<Long, SimpleEntry<Long, Object>> toApexEventMap = new HashMap<Long, SimpleEntry<Long, Object>>();
60 // Map holding reply events
61 private final Map<Long, SimpleEntry<Long, Object>> fromApexEventMap =
62 new HashMap<Long, SimpleEntry<Long, Object>>();
64 // The message listener thread and stopping flag
65 private final Thread synchronousEventCacheThread;
66 private boolean stopOrderedFlag = false;
69 * Create a synchronous event cache that caches outstanding synchronous Apex events.
71 * @param peeredMode the peered mode for which to return the reference
72 * @param consumer the consumer that is populating the cache
73 * @param producer the producer that is emptying the cache
74 * @param synchronousEventTimeout the time in milliseconds to wait for the reply to a sent
77 public SynchronousEventCache(final EventHandlerPeeredMode peeredMode, final ApexEventConsumer consumer,
78 final ApexEventProducer producer, final long synchronousEventTimeout) {
79 super(peeredMode, consumer, producer);
81 if (synchronousEventTimeout != 0) {
82 this.synchronousEventTimeout = synchronousEventTimeout;
84 this.synchronousEventTimeout = DEFAULT_SYNCHRONOUS_EVENT_TIMEOUT;
87 // Start scanning the outstanding events
88 synchronousEventCacheThread = new Thread(this);
89 synchronousEventCacheThread.setDaemon(true);
90 synchronousEventCacheThread.start();
94 * Gets the timeout value for synchronous events.
96 * @return the synchronous event timeout
98 public long getSynchronousEventTimeout() {
99 return synchronousEventTimeout;
103 * Cache a synchronized event sent into Apex in the event cache.
105 * @param executionId the execution ID that was assigned to the event
106 * @param event the apex event
108 public void cacheSynchronizedEventToApex(final long executionId, final Object event) {
109 // Add the event to the map
110 synchronized (toApexEventMap) {
111 cacheSynchronizedEvent(toApexEventMap, executionId, event);
116 * Remove the record of an event sent to Apex if it exists in the cache.
118 * @param executionId the execution ID of the event
119 * @return The removed event
121 public Object removeCachedEventToApexIfExists(final long executionId) {
122 synchronized (toApexEventMap) {
123 return removeCachedEventIfExists(toApexEventMap, executionId);
128 * Check if an event exists in the to apex cache.
130 * @param executionId the execution ID of the event
131 * @return true if the event exists, false otherwise
133 public boolean existsEventToApex(final long executionId) {
134 synchronized (toApexEventMap) {
135 return toApexEventMap.containsKey(executionId);
140 * Cache synchronized event received from Apex in the event cache.
142 * @param executionId the execution ID of the event
143 * @param event the apex event
145 public void cacheSynchronizedEventFromApex(final long executionId, final Object event) {
146 // Add the event to the map
147 synchronized (fromApexEventMap) {
148 cacheSynchronizedEvent(fromApexEventMap, executionId, event);
153 * Remove the record of an event received from Apex if it exists in the cache.
155 * @param executionId the execution ID of the event
156 * @return The removed event
158 public Object removeCachedEventFromApexIfExists(final long executionId) {
159 synchronized (fromApexEventMap) {
160 return removeCachedEventIfExists(fromApexEventMap, executionId);
165 * Check if an event exists in the from apex cache.
167 * @param executionId the execution ID of the event
168 * @return true if the event exists, false otherwise
170 public boolean existsEventFromApex(final long executionId) {
171 synchronized (fromApexEventMap) {
172 return fromApexEventMap.containsKey(executionId);
179 * @see java.lang.Runnable#run()
185 // Periodic scan of outstanding events
186 while (synchronousEventCacheThread.isAlive() && !stopOrderedFlag) {
187 ThreadUtilities.sleep(OUTSTANDING_EVENT_POLL_TIMEOUT);
189 // Check for timeouts on events
190 synchronized (toApexEventMap) {
191 timeoutEventsOnCache(toApexEventMap);
193 synchronized (fromApexEventMap) {
194 timeoutEventsOnCache(fromApexEventMap);
202 * Stops the scanning thread and clears the cache.
204 public synchronized void stop() {
206 stopOrderedFlag = true;
208 while (synchronousEventCacheThread.isAlive()) {
209 ThreadUtilities.sleep(CACHE_STOP_WAIT_INTERVAL);
212 // Check if there are any unprocessed events
213 if (!toApexEventMap.isEmpty()) {
214 LOGGER.warn(toApexEventMap.size() + " synchronous events dropped due to system shutdown");
217 toApexEventMap.clear();
222 * Cache a synchronized event sent in an event cache.
224 * @param eventCacheMap the map to cache the event on
225 * @param executionId the execution ID of the event
226 * @param event the event to cache
228 private void cacheSynchronizedEvent(final Map<Long, SimpleEntry<Long, Object>> eventCacheMap,
229 final long executionId, final Object event) {
230 LOGGER.entry("Adding event with execution ID: " + executionId);
232 // Check if the event is already in the cache
233 if (eventCacheMap.containsKey(executionId)) {
234 // If there was no sent event then the event timed out or some unexpected event was
236 final String errorMessage = "an event with ID " + executionId
237 + " already exists in the synchronous event cache, execution IDs must be unique in the system";
238 LOGGER.warn(errorMessage);
239 throw new ApexEventRuntimeException(errorMessage);
242 // Add the event to the map
243 eventCacheMap.put(executionId, new SimpleEntry<Long, Object>(System.currentTimeMillis(), event));
245 if (LOGGER.isDebugEnabled()) {
246 LOGGER.debug("event has been cached:" + event);
249 LOGGER.exit("Added: " + executionId);
253 * Remove the record of an event if it exists in the cache.
255 * @param eventCacheMap the map to remove the event from
256 * @param executionId the execution ID of the event
257 * @return The removed event
259 private Object removeCachedEventIfExists(final Map<Long, SimpleEntry<Long, Object>> eventCacheMap,
260 final long executionId) {
261 LOGGER.entry("Removing: " + executionId);
263 final SimpleEntry<Long, Object> removedEventEntry = eventCacheMap.remove(executionId);
265 if (removedEventEntry != null) {
266 LOGGER.exit("Removed: " + executionId);
267 return removedEventEntry.getValue();
269 // The event may not be one of the events in our cache, so we just ignore removal
276 * Time out events on an event cache map. Events that have a timeout longer than the configured
277 * timeout are timed out.
279 * @param eventCacheMap the event cache to operate on
281 private void timeoutEventsOnCache(final Map<Long, SimpleEntry<Long, Object>> eventCacheMap) {
282 // Use a set to keep track of the events that have timed out
283 final Set<Long> timedOutEventSet = new HashSet<>();
285 for (final Entry<Long, SimpleEntry<Long, Object>> cachedEventEntry : eventCacheMap.entrySet()) {
286 // The amount of time we are waiting for the event reply
287 final long eventWaitTime = System.currentTimeMillis() - cachedEventEntry.getValue().getKey();
289 // Have we a timeout?
290 if (eventWaitTime > synchronousEventTimeout) {
291 timedOutEventSet.add(cachedEventEntry.getKey());
295 // Remove timed out events from the map
296 for (final long timedoutEventExecutionID : timedOutEventSet) {
297 // Remove the map entry and issue a warning
298 final SimpleEntry<Long, Object> timedOutEventEntry = eventCacheMap.remove(timedoutEventExecutionID);
300 LOGGER.warn("synchronous event timed out, reply not received in " + synchronousEventTimeout
301 + " milliseconds on event " + timedOutEventEntry.getValue());