Changes for checkstyle 8.32
[policy/apex-pdp.git] / services / services-engine / src / main / java / org / onap / policy / apex / service / engine / event / SynchronousEventCache.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  * 
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  * 
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * 
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.service.engine.event;
22
23 import java.util.AbstractMap.SimpleEntry;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.Map;
27 import java.util.Map.Entry;
28 import java.util.Set;
29 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
30 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
31 import org.slf4j.ext.XLogger;
32 import org.slf4j.ext.XLoggerFactory;
33
34 /**
35  * This class holds a cache of the synchronous events sent into Apex and that have not yet been replied to. It runs a
36  * thread to time out events that have not been replied to in the specified timeout.
37  * 
38  * @author Liam Fallon (liam.fallon@ericsson.com)
39  */
40 public class SynchronousEventCache extends PeeredReference implements Runnable {
41     // Get a reference to the logger
42     private static final XLogger LOGGER = XLoggerFactory.getXLogger(SynchronousEventCache.class);
43
44     // The default amount of time to wait for a synchronous event to be replied to is 1 second
45     private static final long DEFAULT_SYNCHRONOUS_EVENT_TIMEOUT = 1000;
46
47     // The timeout to wait between event polls in milliseconds and the time to wait for the thread
48     // to stop
49     private static final long OUTSTANDING_EVENT_POLL_TIMEOUT = 50;
50     private static final long CACHE_STOP_WAIT_INTERVAL = 10;
51
52     // The time in milliseconds to wait for the reply to a sent synchronous event
53     private long synchronousEventTimeout = DEFAULT_SYNCHRONOUS_EVENT_TIMEOUT;
54
55     // Map holding outstanding synchronous events
56     private final Map<Long, SimpleEntry<Long, Object>> toApexEventMap = new HashMap<>();
57
58     // Map holding reply events
59     private final Map<Long, SimpleEntry<Long, Object>> fromApexEventMap = new HashMap<>();
60
61     // The message listener thread and stopping flag
62     private final Thread synchronousEventCacheThread;
63     private boolean stopOrderedFlag = false;
64
65     /**
66      * Create a synchronous event cache that caches outstanding synchronous Apex events.
67      * 
68      * @param peeredMode the peered mode for which to return the reference
69      * @param consumer the consumer that is populating the cache
70      * @param producer the producer that is emptying the cache
71      * @param synchronousEventTimeout the time in milliseconds to wait for the reply to a sent synchronous event
72      */
73     public SynchronousEventCache(final EventHandlerPeeredMode peeredMode, final ApexEventConsumer consumer,
74                     final ApexEventProducer producer, final long synchronousEventTimeout) {
75         super(peeredMode, consumer, producer);
76
77         if (synchronousEventTimeout != 0) {
78             this.synchronousEventTimeout = synchronousEventTimeout;
79         } else {
80             this.synchronousEventTimeout = DEFAULT_SYNCHRONOUS_EVENT_TIMEOUT;
81         }
82
83         // Start scanning the outstanding events
84         synchronousEventCacheThread = new Thread(this);
85         synchronousEventCacheThread.setDaemon(true);
86         synchronousEventCacheThread.start();
87     }
88
89     /**
90      * Gets the timeout value for synchronous events.
91      *
92      * @return the synchronous event timeout
93      */
94     public long getSynchronousEventTimeout() {
95         return synchronousEventTimeout;
96     }
97
98     /**
99      * Cache a synchronized event sent into Apex in the event cache.
100      *
101      * @param executionId the execution ID that was assigned to the event
102      * @param event the apex event
103      */
104     public void cacheSynchronizedEventToApex(final long executionId, final Object event) {
105         // Add the event to the map
106         synchronized (toApexEventMap) {
107             cacheSynchronizedEvent(toApexEventMap, executionId, event);
108         }
109     }
110
111     /**
112      * Remove the record of an event sent to Apex if it exists in the cache.
113      * 
114      * @param executionId the execution ID of the event
115      * @return The removed event
116      */
117     public Object removeCachedEventToApexIfExists(final long executionId) {
118         synchronized (toApexEventMap) {
119             return removeCachedEventIfExists(toApexEventMap, executionId);
120         }
121     }
122
123     /**
124      * Check if an event exists in the to apex cache.
125      * 
126      * @param executionId the execution ID of the event
127      * @return true if the event exists, false otherwise
128      */
129     public boolean existsEventToApex(final long executionId) {
130         synchronized (toApexEventMap) {
131             return toApexEventMap.containsKey(executionId);
132         }
133     }
134
135     /**
136      * Cache synchronized event received from Apex in the event cache.
137      *
138      * @param executionId the execution ID of the event
139      * @param event the apex event
140      */
141     public void cacheSynchronizedEventFromApex(final long executionId, final Object event) {
142         // Add the event to the map
143         synchronized (fromApexEventMap) {
144             cacheSynchronizedEvent(fromApexEventMap, executionId, event);
145         }
146     }
147
148     /**
149      * Remove the record of an event received from Apex if it exists in the cache.
150      * 
151      * @param executionId the execution ID of the event
152      * @return The removed event
153      */
154     public Object removeCachedEventFromApexIfExists(final long executionId) {
155         synchronized (fromApexEventMap) {
156             return removeCachedEventIfExists(fromApexEventMap, executionId);
157         }
158     }
159
160     /**
161      * Check if an event exists in the from apex cache.
162      * 
163      * @param executionId the execution ID of the event
164      * @return true if the event exists, false otherwise
165      */
166     public boolean existsEventFromApex(final long executionId) {
167         synchronized (fromApexEventMap) {
168             return fromApexEventMap.containsKey(executionId);
169         }
170     }
171
172     /**
173      * {@inheritDoc}.
174      */
175     @Override
176     public void run() {
177         LOGGER.entry();
178
179         // Periodic scan of outstanding events
180         while (synchronousEventCacheThread.isAlive() && !stopOrderedFlag) {
181             ThreadUtilities.sleep(OUTSTANDING_EVENT_POLL_TIMEOUT);
182
183             // Check for timeouts on events
184             synchronized (toApexEventMap) {
185                 timeoutEventsOnCache(toApexEventMap);
186             }
187             synchronized (fromApexEventMap) {
188                 timeoutEventsOnCache(fromApexEventMap);
189             }
190         }
191
192         LOGGER.exit();
193     }
194
195     /**
196      * Stops the scanning thread and clears the cache.
197      */
198     public synchronized void stop() {
199         LOGGER.entry();
200         stopOrderedFlag = true;
201
202         while (synchronousEventCacheThread.isAlive()) {
203             ThreadUtilities.sleep(CACHE_STOP_WAIT_INTERVAL);
204         }
205
206         // Check if there are any unprocessed events
207         if (!toApexEventMap.isEmpty()) {
208             String message = toApexEventMap.size() + " synchronous events dropped due to system shutdown";
209             LOGGER.warn(message);
210         }
211
212         toApexEventMap.clear();
213         LOGGER.exit();
214     }
215
216     /**
217      * Cache a synchronized event sent in an event cache.
218      * 
219      * @param eventCacheMap the map to cache the event on
220      * @param executionId the execution ID of the event
221      * @param event the event to cache
222      */
223     private void cacheSynchronizedEvent(final Map<Long, SimpleEntry<Long, Object>> eventCacheMap,
224                     final long executionId, final Object event) {
225         LOGGER.entry("Adding event with execution ID: " + executionId);
226
227         // Check if the event is already in the cache
228         if (eventCacheMap.containsKey(executionId)) {
229             // If there was no sent event then the event timed out or some unexpected event was
230             // received
231             final String errorMessage = "an event with ID " + executionId
232                             + " already exists in the synchronous event cache, "
233                             + "execution IDs must be unique in the system";
234             LOGGER.warn(errorMessage);
235             throw new ApexEventRuntimeException(errorMessage);
236         }
237
238         // Add the event to the map
239         eventCacheMap.put(executionId, new SimpleEntry<Long, Object>(System.currentTimeMillis(), event));
240
241         if (LOGGER.isDebugEnabled()) {
242             String message = "event has been cached:" + event;
243             LOGGER.debug(message);
244         }
245
246         LOGGER.exit("Added: " + executionId);
247     }
248
249     /**
250      * Remove the record of an event if it exists in the cache.
251      * 
252      * @param eventCacheMap the map to remove the event from
253      * @param executionId the execution ID of the event
254      * @return The removed event
255      */
256     private Object removeCachedEventIfExists(final Map<Long, SimpleEntry<Long, Object>> eventCacheMap,
257                     final long executionId) {
258         LOGGER.entry("Removing: " + executionId);
259
260         final SimpleEntry<Long, Object> removedEventEntry = eventCacheMap.remove(executionId);
261
262         if (removedEventEntry != null) {
263             LOGGER.exit("Removed: " + executionId);
264             return removedEventEntry.getValue();
265         } else {
266             // The event may not be one of the events in our cache, so we just ignore removal
267             // failures
268             return null;
269         }
270     }
271
272     /**
273      * Time out events on an event cache map. Events that have a timeout longer than the configured timeout are timed
274      * out.
275      * 
276      * @param eventCacheMap the event cache to operate on
277      */
278     private void timeoutEventsOnCache(final Map<Long, SimpleEntry<Long, Object>> eventCacheMap) {
279         // Use a set to keep track of the events that have timed out
280         final Set<Long> timedOutEventSet = new HashSet<>();
281
282         for (final Entry<Long, SimpleEntry<Long, Object>> cachedEventEntry : eventCacheMap.entrySet()) {
283             // The amount of time we are waiting for the event reply
284             final long eventWaitTime = System.currentTimeMillis() - cachedEventEntry.getValue().getKey();
285
286             // Have we a timeout?
287             if (eventWaitTime > synchronousEventTimeout) {
288                 timedOutEventSet.add(cachedEventEntry.getKey());
289             }
290         }
291
292         // Remove timed out events from the map
293         for (final long timedoutEventExecutionId : timedOutEventSet) {
294             // Remove the map entry and issue a warning
295             final SimpleEntry<Long, Object> timedOutEventEntry = eventCacheMap.remove(timedoutEventExecutionId);
296
297             String message = "synchronous event timed out, reply not received in " + synchronousEventTimeout
298                             + " milliseconds on event " + timedOutEventEntry.getValue();
299             LOGGER.warn(message);
300         }
301     }
302 }