1830fc0e5f86c0e75bc8c6119487e686d0c7c1ed
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  * 
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  * 
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * 
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.service.engine.event;
22
23 import java.util.AbstractMap.SimpleEntry;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.Map;
27 import java.util.Map.Entry;
28 import java.util.Set;
29
30 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
31 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
32 import org.slf4j.ext.XLogger;
33 import org.slf4j.ext.XLoggerFactory;
34
35 /**
36  * This class holds a cache of the synchronous events sent into Apex and that have not yet been
37  * replied to. It runs a thread to time out events that have not been replied to in the specified
38  * timeout.
39  * 
40  * @author Liam Fallon (liam.fallon@ericsson.com)
41  */
42 public class SynchronousEventCache extends PeeredReference implements Runnable {
43     // Get a reference to the logger
44     private static final XLogger LOGGER = XLoggerFactory.getXLogger(SynchronousEventCache.class);
45
46     // The default amount of time to wait for a synchronous event to be replied to is 1 second
47     private static final long DEFAULT_SYNCHRONOUS_EVENT_TIMEOUT = 1000;
48
49     // The timeout to wait between event polls in milliseconds and the time to wait for the thread
50     // to stop
51     private static final long OUTSTANDING_EVENT_POLL_TIMEOUT = 50;
52     private static final long CACHE_STOP_WAIT_INTERVAL = 10;
53
54     // The time in milliseconds to wait for the reply to a sent synchronous event
55     private long synchronousEventTimeout = DEFAULT_SYNCHRONOUS_EVENT_TIMEOUT;
56
57     // Map holding outstanding synchronous events
58     private final Map<Long, SimpleEntry<Long, Object>> toApexEventMap = new HashMap<Long, SimpleEntry<Long, Object>>();
59
60     // Map holding reply events
61     private final Map<Long, SimpleEntry<Long, Object>> fromApexEventMap =
62             new HashMap<Long, SimpleEntry<Long, Object>>();
63
64     // The message listener thread and stopping flag
65     private final Thread synchronousEventCacheThread;
66     private boolean stopOrderedFlag = false;
67
68     /**
69      * Create a synchronous event cache that caches outstanding synchronous Apex events.
70      * 
71      * @param peeredMode the peered mode for which to return the reference
72      * @param consumer the consumer that is populating the cache
73      * @param producer the producer that is emptying the cache
74      * @param synchronousEventTimeout the time in milliseconds to wait for the reply to a sent
75      *        synchronous event
76      */
77     public SynchronousEventCache(final EventHandlerPeeredMode peeredMode, final ApexEventConsumer consumer,
78             final ApexEventProducer producer, final long synchronousEventTimeout) {
79         super(peeredMode, consumer, producer);
80
81         if (synchronousEventTimeout != 0) {
82             this.synchronousEventTimeout = synchronousEventTimeout;
83         } else {
84             this.synchronousEventTimeout = DEFAULT_SYNCHRONOUS_EVENT_TIMEOUT;
85         }
86
87         // Start scanning the outstanding events
88         synchronousEventCacheThread = new Thread(this);
89         synchronousEventCacheThread.setDaemon(true);
90         synchronousEventCacheThread.start();
91     }
92
93     /**
94      * Gets the timeout value for synchronous events.
95      *
96      * @return the synchronous event timeout
97      */
98     public long getSynchronousEventTimeout() {
99         return synchronousEventTimeout;
100     }
101
102     /**
103      * Cache a synchronized event sent into Apex in the event cache.
104      *
105      * @param executionId the execution ID that was assigned to the event
106      * @param event the apex event
107      */
108     public void cacheSynchronizedEventToApex(final long executionId, final Object event) {
109         // Add the event to the map
110         synchronized (toApexEventMap) {
111             cacheSynchronizedEvent(toApexEventMap, executionId, event);
112         }
113     }
114
115     /**
116      * Remove the record of an event sent to Apex if it exists in the cache.
117      * 
118      * @param executionId the execution ID of the event
119      * @return The removed event
120      */
121     public Object removeCachedEventToApexIfExists(final long executionId) {
122         synchronized (toApexEventMap) {
123             return removeCachedEventIfExists(toApexEventMap, executionId);
124         }
125     }
126
127     /**
128      * Check if an event exists in the to apex cache.
129      * 
130      * @param executionId the execution ID of the event
131      * @return true if the event exists, false otherwise
132      */
133     public boolean existsEventToApex(final long executionId) {
134         synchronized (toApexEventMap) {
135             return toApexEventMap.containsKey(executionId);
136         }
137     }
138
139     /**
140      * Cache synchronized event received from Apex in the event cache.
141      *
142      * @param executionId the execution ID of the event
143      * @param event the apex event
144      */
145     public void cacheSynchronizedEventFromApex(final long executionId, final Object event) {
146         // Add the event to the map
147         synchronized (fromApexEventMap) {
148             cacheSynchronizedEvent(fromApexEventMap, executionId, event);
149         }
150     }
151
152     /**
153      * Remove the record of an event received from Apex if it exists in the cache.
154      * 
155      * @param executionId the execution ID of the event
156      * @return The removed event
157      */
158     public Object removeCachedEventFromApexIfExists(final long executionId) {
159         synchronized (fromApexEventMap) {
160             return removeCachedEventIfExists(fromApexEventMap, executionId);
161         }
162     }
163
164     /**
165      * Check if an event exists in the from apex cache.
166      * 
167      * @param executionId the execution ID of the event
168      * @return true if the event exists, false otherwise
169      */
170     public boolean existsEventFromApex(final long executionId) {
171         synchronized (fromApexEventMap) {
172             return fromApexEventMap.containsKey(executionId);
173         }
174     }
175
176     /*
177      * (non-Javadoc)
178      * 
179      * @see java.lang.Runnable#run()
180      */
181     @Override
182     public void run() {
183         LOGGER.entry();
184
185         // Periodic scan of outstanding events
186         while (synchronousEventCacheThread.isAlive() && !stopOrderedFlag) {
187             ThreadUtilities.sleep(OUTSTANDING_EVENT_POLL_TIMEOUT);
188
189             // Check for timeouts on events
190             synchronized (toApexEventMap) {
191                 timeoutEventsOnCache(toApexEventMap);
192             }
193             synchronized (fromApexEventMap) {
194                 timeoutEventsOnCache(fromApexEventMap);
195             }
196         }
197
198         LOGGER.exit();
199     }
200
201     /**
202      * Stops the scanning thread and clears the cache.
203      */
204     public synchronized void stop() {
205         LOGGER.entry();
206         stopOrderedFlag = true;
207
208         while (synchronousEventCacheThread.isAlive()) {
209             ThreadUtilities.sleep(CACHE_STOP_WAIT_INTERVAL);
210         }
211
212         // Check if there are any unprocessed events
213         if (!toApexEventMap.isEmpty()) {
214             LOGGER.warn(toApexEventMap.size() + " synchronous events dropped due to system shutdown");
215         }
216
217         toApexEventMap.clear();
218         LOGGER.exit();
219     }
220
221     /**
222      * Cache a synchronized event sent in an event cache.
223      * 
224      * @param eventCacheMap the map to cache the event on
225      * @param executionId the execution ID of the event
226      * @param event the event to cache
227      */
228     private void cacheSynchronizedEvent(final Map<Long, SimpleEntry<Long, Object>> eventCacheMap,
229             final long executionId, final Object event) {
230         LOGGER.entry("Adding event with execution ID: " + executionId);
231
232         // Check if the event is already in the cache
233         if (eventCacheMap.containsKey(executionId)) {
234             // If there was no sent event then the event timed out or some unexpected event was
235             // received
236             final String errorMessage = "an event with ID " + executionId
237                     + " already exists in the synchronous event cache, execution IDs must be unique in the system";
238             LOGGER.warn(errorMessage);
239             throw new ApexEventRuntimeException(errorMessage);
240         }
241
242         // Add the event to the map
243         eventCacheMap.put(executionId, new SimpleEntry<Long, Object>(System.currentTimeMillis(), event));
244
245         if (LOGGER.isDebugEnabled()) {
246             LOGGER.debug("event has been cached:" + event);
247         }
248
249         LOGGER.exit("Added: " + executionId);
250     }
251
252     /**
253      * Remove the record of an event if it exists in the cache.
254      * 
255      * @param eventCacheMap the map to remove the event from
256      * @param executionId the execution ID of the event
257      * @return The removed event
258      */
259     private Object removeCachedEventIfExists(final Map<Long, SimpleEntry<Long, Object>> eventCacheMap,
260             final long executionId) {
261         LOGGER.entry("Removing: " + executionId);
262
263         final SimpleEntry<Long, Object> removedEventEntry = eventCacheMap.remove(executionId);
264
265         if (removedEventEntry != null) {
266             LOGGER.exit("Removed: " + executionId);
267             return removedEventEntry.getValue();
268         } else {
269             // The event may not be one of the events in our cache, so we just ignore removal
270             // failures
271             return null;
272         }
273     }
274
275     /**
276      * Time out events on an event cache map. Events that have a timeout longer than the configured
277      * timeout are timed out.
278      * 
279      * @param eventCacheMap the event cache to operate on
280      */
281     private void timeoutEventsOnCache(final Map<Long, SimpleEntry<Long, Object>> eventCacheMap) {
282         // Use a set to keep track of the events that have timed out
283         final Set<Long> timedOutEventSet = new HashSet<>();
284
285         for (final Entry<Long, SimpleEntry<Long, Object>> cachedEventEntry : eventCacheMap.entrySet()) {
286             // The amount of time we are waiting for the event reply
287             final long eventWaitTime = System.currentTimeMillis() - cachedEventEntry.getValue().getKey();
288
289             // Have we a timeout?
290             if (eventWaitTime > synchronousEventTimeout) {
291                 timedOutEventSet.add(cachedEventEntry.getKey());
292             }
293         }
294
295         // Remove timed out events from the map
296         for (final long timedoutEventExecutionID : timedOutEventSet) {
297             // Remove the map entry and issue a warning
298             final SimpleEntry<Long, Object> timedOutEventEntry = eventCacheMap.remove(timedoutEventExecutionID);
299
300             LOGGER.warn("synchronous event timed out, reply not received in " + synchronousEventTimeout
301                     + " milliseconds on event " + timedOutEventEntry.getValue());
302         }
303     }
304 }