Sonar Fixes
[policy/apex-pdp.git] / services / services-engine / src / main / java / org / onap / policy / apex / service / engine / event / SynchronousEventCache.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2020 Nordix Foundation.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.apex.service.engine.event;
23
24 import java.util.AbstractMap.SimpleEntry;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.Map;
28 import java.util.Map.Entry;
29 import java.util.Set;
30 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
31 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
32 import org.slf4j.ext.XLogger;
33 import org.slf4j.ext.XLoggerFactory;
34
35 /**
36  * This class holds a cache of the synchronous events sent into Apex and that have not yet been replied to. It runs a
37  * thread to time out events that have not been replied to in the specified timeout.
38  *
39  * @author Liam Fallon (liam.fallon@ericsson.com)
40  */
41 public class SynchronousEventCache extends PeeredReference implements Runnable {
42     // Get a reference to the logger
43     private static final XLogger LOGGER = XLoggerFactory.getXLogger(SynchronousEventCache.class);
44
45     // The default amount of time to wait for a synchronous event to be replied to is 1 second
46     private static final long DEFAULT_SYNCHRONOUS_EVENT_TIMEOUT = 1000;
47
48     // The timeout to wait between event polls in milliseconds and the time to wait for the thread
49     // to stop
50     private static final long OUTSTANDING_EVENT_POLL_TIMEOUT = 50;
51     private static final long CACHE_STOP_WAIT_INTERVAL = 10;
52
53     // The time in milliseconds to wait for the reply to a sent synchronous event
54     private long synchronousEventTimeout = DEFAULT_SYNCHRONOUS_EVENT_TIMEOUT;
55
56     // Map holding outstanding synchronous events
57     private final Map<Long, SimpleEntry<Long, Object>> toApexEventMap = new HashMap<>();
58
59     // Map holding reply events
60     private final Map<Long, SimpleEntry<Long, Object>> fromApexEventMap = new HashMap<>();
61
62     // The message listener thread and stopping flag
63     private final Thread synchronousEventCacheThread;
64     private boolean stopOrderedFlag = false;
65
66     /**
67      * Create a synchronous event cache that caches outstanding synchronous Apex events.
68      *
69      * @param peeredMode the peered mode for which to return the reference
70      * @param consumer the consumer that is populating the cache
71      * @param producer the producer that is emptying the cache
72      * @param synchronousEventTimeout the time in milliseconds to wait for the reply to a sent synchronous event
73      */
74     public SynchronousEventCache(final EventHandlerPeeredMode peeredMode, final ApexEventConsumer consumer,
75                     final ApexEventProducer producer, final long synchronousEventTimeout) {
76         super(peeredMode, consumer, producer);
77
78         if (synchronousEventTimeout != 0) {
79             this.synchronousEventTimeout = synchronousEventTimeout;
80         } else {
81             this.synchronousEventTimeout = DEFAULT_SYNCHRONOUS_EVENT_TIMEOUT;
82         }
83
84         // Start scanning the outstanding events
85         synchronousEventCacheThread = new Thread(this);
86         synchronousEventCacheThread.setDaemon(true);
87         synchronousEventCacheThread.start();
88     }
89
90     /**
91      * Gets the timeout value for synchronous events.
92      *
93      * @return the synchronous event timeout
94      */
95     public long getSynchronousEventTimeout() {
96         return synchronousEventTimeout;
97     }
98
99     /**
100      * Cache a synchronized event sent into Apex in the event cache.
101      *
102      * @param executionId the execution ID that was assigned to the event
103      * @param event the apex event
104      */
105     public void cacheSynchronizedEventToApex(final long executionId, final Object event) {
106         // Add the event to the map
107         synchronized (toApexEventMap) {
108             cacheSynchronizedEvent(toApexEventMap, executionId, event);
109         }
110     }
111
112     /**
113      * Remove the record of an event sent to Apex if it exists in the cache.
114      *
115      * @param executionId the execution ID of the event
116      * @return The removed event
117      */
118     public Object removeCachedEventToApexIfExists(final long executionId) {
119         synchronized (toApexEventMap) {
120             return removeCachedEventIfExists(toApexEventMap, executionId);
121         }
122     }
123
124     /**
125      * Check if an event exists in the to apex cache.
126      *
127      * @param executionId the execution ID of the event
128      * @return true if the event exists, false otherwise
129      */
130     public boolean existsEventToApex(final long executionId) {
131         synchronized (toApexEventMap) {
132             return toApexEventMap.containsKey(executionId);
133         }
134     }
135
136     /**
137      * Cache synchronized event received from Apex in the event cache.
138      *
139      * @param executionId the execution ID of the event
140      * @param event the apex event
141      */
142     public void cacheSynchronizedEventFromApex(final long executionId, final Object event) {
143         // Add the event to the map
144         synchronized (fromApexEventMap) {
145             cacheSynchronizedEvent(fromApexEventMap, executionId, event);
146         }
147     }
148
149     /**
150      * Remove the record of an event received from Apex if it exists in the cache.
151      *
152      * @param executionId the execution ID of the event
153      * @return The removed event
154      */
155     public Object removeCachedEventFromApexIfExists(final long executionId) {
156         synchronized (fromApexEventMap) {
157             return removeCachedEventIfExists(fromApexEventMap, executionId);
158         }
159     }
160
161     /**
162      * Check if an event exists in the from apex cache.
163      *
164      * @param executionId the execution ID of the event
165      * @return true if the event exists, false otherwise
166      */
167     public boolean existsEventFromApex(final long executionId) {
168         synchronized (fromApexEventMap) {
169             return fromApexEventMap.containsKey(executionId);
170         }
171     }
172
173     /**
174      * {@inheritDoc}.
175      */
176     @Override
177     public void run() {
178         LOGGER.entry();
179
180         // Periodic scan of outstanding events
181         while (synchronousEventCacheThread.isAlive() && !stopOrderedFlag) {
182             ThreadUtilities.sleep(OUTSTANDING_EVENT_POLL_TIMEOUT);
183
184             // Check for timeouts on events
185             synchronized (toApexEventMap) {
186                 timeoutEventsOnCache(toApexEventMap);
187             }
188             synchronized (fromApexEventMap) {
189                 timeoutEventsOnCache(fromApexEventMap);
190             }
191         }
192
193         LOGGER.exit();
194     }
195
196     /**
197      * Stops the scanning thread and clears the cache.
198      */
199     public synchronized void stop() {
200         LOGGER.entry();
201         stopOrderedFlag = true;
202
203         while (synchronousEventCacheThread.isAlive()) {
204             ThreadUtilities.sleep(CACHE_STOP_WAIT_INTERVAL);
205         }
206
207         // Check if there are any unprocessed events
208         if (!toApexEventMap.isEmpty()) {
209             String message = toApexEventMap.size() + " synchronous events dropped due to system shutdown";
210             LOGGER.warn(message);
211         }
212
213         toApexEventMap.clear();
214         LOGGER.exit();
215     }
216
217     /**
218      * Cache a synchronized event sent in an event cache.
219      *
220      * @param eventCacheMap the map to cache the event on
221      * @param executionId the execution ID of the event
222      * @param event the event to cache
223      */
224     private void cacheSynchronizedEvent(final Map<Long, SimpleEntry<Long, Object>> eventCacheMap,
225                     final long executionId, final Object event) {
226         LOGGER.entry("Adding event with execution ID: " + executionId);
227
228         // Check if the event is already in the cache
229         if (eventCacheMap.containsKey(executionId)) {
230             // If there was no sent event then the event timed out or some unexpected event was
231             // received
232             final String errorMessage = "an event with ID " + executionId
233                             + " already exists in the synchronous event cache, "
234                             + "execution IDs must be unique in the system";
235             LOGGER.warn(errorMessage);
236             throw new ApexEventRuntimeException(errorMessage);
237         }
238
239         // Add the event to the map
240         eventCacheMap.put(executionId, new SimpleEntry<>(System.currentTimeMillis(), event));
241
242         if (LOGGER.isDebugEnabled()) {
243             String message = "event has been cached:" + event;
244             LOGGER.debug(message);
245         }
246
247         LOGGER.exit("Added: " + executionId);
248     }
249
250     /**
251      * Remove the record of an event if it exists in the cache.
252      *
253      * @param eventCacheMap the map to remove the event from
254      * @param executionId the execution ID of the event
255      * @return The removed event
256      */
257     private Object removeCachedEventIfExists(final Map<Long, SimpleEntry<Long, Object>> eventCacheMap,
258                     final long executionId) {
259         LOGGER.entry("Removing: " + executionId);
260
261         final SimpleEntry<Long, Object> removedEventEntry = eventCacheMap.remove(executionId);
262
263         if (removedEventEntry != null) {
264             LOGGER.exit("Removed: " + executionId);
265             return removedEventEntry.getValue();
266         } else {
267             // The event may not be one of the events in our cache, so we just ignore removal
268             // failures
269             return null;
270         }
271     }
272
273     /**
274      * Time out events on an event cache map. Events that have a timeout longer than the configured timeout are timed
275      * out.
276      *
277      * @param eventCacheMap the event cache to operate on
278      */
279     private void timeoutEventsOnCache(final Map<Long, SimpleEntry<Long, Object>> eventCacheMap) {
280         // Use a set to keep track of the events that have timed out
281         final Set<Long> timedOutEventSet = new HashSet<>();
282
283         for (final Entry<Long, SimpleEntry<Long, Object>> cachedEventEntry : eventCacheMap.entrySet()) {
284             // The amount of time we are waiting for the event reply
285             final long eventWaitTime = System.currentTimeMillis() - cachedEventEntry.getValue().getKey();
286
287             // Have we a timeout?
288             if (eventWaitTime > synchronousEventTimeout) {
289                 timedOutEventSet.add(cachedEventEntry.getKey());
290             }
291         }
292
293         // Remove timed out events from the map
294         for (final long timedoutEventExecutionId : timedOutEventSet) {
295             // Remove the map entry and issue a warning
296             final SimpleEntry<Long, Object> timedOutEventEntry = eventCacheMap.remove(timedoutEventExecutionId);
297
298             String message = "synchronous event timed out, reply not received in " + synchronousEventTimeout
299                             + " milliseconds on event " + timedOutEventEntry.getValue();
300             LOGGER.warn(message);
301         }
302     }
303 }