be713ff20a5fb9480cac138c79381795c2fcc089
[aai/spike.git] / src / main / java / org / onap / aai / spike / event / incoming / OffsetManager.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.aai.spike.event.incoming;
22
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.ScheduledExecutorService;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.AtomicInteger;
27 import java.util.concurrent.atomic.AtomicLong;
28 import org.onap.aai.cl.api.Logger;
29 import org.onap.aai.cl.eelf.LoggerFactory;
30 import org.onap.aai.spike.exception.SpikeException;
31 import org.onap.aai.spike.logging.SpikeMsgs;
32
33
34 /**
35  * Instances of this class maintain a buffer of events which have been received and are queued up to
36  * be processed.
37  * <p>
38  * A background thread advances a pointer into the buffer which always points to the head of the
39  * most recent consecutive block of processed events. This allows us, at any time, to know what
40  * offset value can be safely committed to the event store (meaning any events before that offset
41  * into the event topic will not be reprocessed on a restart).
42  */
43 public class OffsetManager {
44
45     /** Buffer that we use for caching 'in flight' events. */
46     private RingEntry[] ringBuffer;
47
48     /** Number of elements that can be stored in the buffer. */
49     private int bufferSize;
50
51     /** Pointer to the next free slot in the buffer. */
52     private AtomicLong writePointer = new AtomicLong(0L);
53
54     /**
55      * Pointer to the next slot in the buffer to wait to be published so that we can commit its offset.
56      */
57     private long commitPointer = 0;
58
59     /**
60      * Executor for scheduling the background task which commits offsets to the event bus.
61      */
62     private ScheduledExecutorService offsetCommitService = Executors.newScheduledThreadPool(1);
63
64     /**
65      * The next offset value which represents the head of a consecutive block of events which have been
66      * processed.
67      */
68     private Long nextOffsetToCommit = null;
69
70     private static Logger logger = LoggerFactory.getInstance().getLogger(OffsetManager.class.getName());
71
72
73     /**
74      * Creates a new instance of the offset manager.
75      * 
76      * @param bufferCapacity - The requested size of the buffer that we will use to cache offsets for
77      *        events that are waiting to be processed.
78      * @param offsetCheckPeriodMs - The period at which we will try to update what we consider to be the
79      *        next offset that can be safely committed to the event bus.
80      */
81     public OffsetManager(int bufferCapacity, long offsetCheckPeriodMs) {
82
83         // In order to make the math work nicely for our write and commit pointers, we
84         // need our buffer size to be a power of 2, so round the supplied buffer size
85         // up to ensure that it is a power of two.
86         //
87         // This way we can just keep incrementing our pointers forever without worrying
88         // about wrapping (we'll eventually roll over from LongMax to LongMin, but if the
89         // buffer size is a power of 2 then our modded physical indexes will still magically
90         // map to the next consecutive index. (Math!)
91         bufferSize = nextPowerOf2(bufferCapacity);
92
93         // Now, allocate and initialize our ring buffer.
94         ringBuffer = new RingEntry[bufferSize];
95         for (int i = 0; i < bufferSize; i++) {
96             ringBuffer[i] = new RingEntry();
97         }
98
99         // Schedule a task to commit the most recent offset value to the event library.
100         offsetCommitService.scheduleAtFixedRate(new OffsetCommitter(), offsetCheckPeriodMs, offsetCheckPeriodMs,
101                 TimeUnit.MILLISECONDS);
102
103         logger.info(SpikeMsgs.OFFSET_MANAGER_STARTED, Integer.toString(bufferSize), Long.toString(offsetCheckPeriodMs));
104     }
105
106
107     /**
108      * Logs an event with the offset manager.
109      * 
110      * @param transactionId - The transaction id associated with this event.
111      * @param commitOffset - The event bus offset associated with this event.
112      * 
113      * @return - The index into the offset manager's buffer for this event.
114      */
115     public int cacheEvent(String transactionId, long commitOffset) {
116
117         // Get the index to the next free slot in the ring...
118         int index = nextFreeSlot();
119
120         if (logger.isDebugEnabled()) {
121             logger.debug("Caching event with transaction-id: " + transactionId + " offset: " + commitOffset
122                     + " to offset manager at index: " + index);
123         }
124
125         // ...and update it with the event meta data we want to cache.
126         ringBuffer[index].setTransactionId(transactionId);
127         ringBuffer[index].setCommitOffset(commitOffset);
128
129         return index;
130     }
131
132
133     /**
134      * Marks a cached event as 'published'.
135      * 
136      * @param anIndex - The index into the event cache that we want to update.
137      * @throws SpikeException
138      */
139     public void markAsPublished(int anIndex) throws SpikeException {
140
141         // Make sure that we were supplied a valid index.
142         if ((anIndex < 0) || (anIndex > bufferSize - 1)) {
143             throw new SpikeException("Invalid index " + anIndex + " for offset manager buffer.");
144         }
145
146         // It is only valid to mark a cell as 'Published' if it is already
147         // in the 'Processing' state.
148         if (!ringBuffer[anIndex].state.compareAndSet(RingEntry.PROCESSING, RingEntry.PUBLISHED)) {
149             throw new SpikeException("Unexpected event state: " + state2String(ringBuffer[anIndex].state.get()));
150         }
151
152         if (logger.isDebugEnabled()) {
153             logger.debug("Event in offset manger buffer at index: " + anIndex + " marked as 'published'");
154         }
155     }
156
157
158     /**
159      * Marks a cached event as 'published'.
160      * 
161      * @param transactionId - The transaction id of the event we want to update.
162      * 
163      * @throws SpikeException
164      */
165     public void markAsPublished(String transactionId) throws SpikeException {
166
167         // Iterate over the ring buffer and try to find the specified transaction
168         // id.
169         for (int i = 0; i < bufferSize; i++) {
170
171             // Is this the one?
172             if (ringBuffer[i].getTransactionId() == transactionId) {
173
174                 // Found the one we are looking for!
175                 markAsPublished(i);
176                 return;
177             }
178         }
179
180         // If we made it here then we didn't find an event with the supplied transaction id.
181         throw new SpikeException("No event with transaction id: " + transactionId + " exists in offset manager buffer");
182     }
183
184
185     /**
186      * Retrieves our current view of what is the most recent offset value that can be safely committed
187      * to the event bus (meaning that all events on the topic before that offset value have been
188      * processed and shouldn't be re-consumed after a restart).
189      * 
190      * @return - The next 'safe' offset.
191      */
192     public Long getNextOffsetToCommit() {
193         return nextOffsetToCommit;
194     }
195
196
197     /**
198      * Finds the next slot in the ring which is marked as 'free'.
199      * 
200      * @return - An index into the ring buffer.
201      */
202     private int nextFreeSlot() {
203
204         int currentIndex = (int) (writePointer.getAndIncrement() % bufferSize);
205         while (!ringBuffer[currentIndex].state.compareAndSet(RingEntry.FREE, RingEntry.PROCESSING)) {
206             currentIndex = (int) (writePointer.getAndIncrement() % bufferSize);
207         }
208
209         return currentIndex;
210     }
211
212
213     /**
214      * Given a number, this helper method finds the next largest number that is a power of 2.
215      * 
216      * @param aNumber - The number to compute the next power of two for.
217      * 
218      * @return - The next largest power of 2 for the supplied number.
219      */
220     private int nextPowerOf2(int aNumber) {
221
222         int powerOfTwo = 1;
223         while (powerOfTwo < aNumber) {
224             powerOfTwo *= 2;
225         }
226         return powerOfTwo;
227     }
228
229     private String state2String(int aState) {
230
231         switch (aState) {
232             case RingEntry.FREE:
233                 return "FREE";
234
235             case RingEntry.PROCESSING:
236                 return "PROCESSING";
237
238             case RingEntry.PUBLISHED:
239                 return "PUBLISHED";
240
241             default:
242                 return "UNDEFINED(" + aState + ")";
243         }
244     }
245
246
247     /**
248      * Defines the structure of the entries in the ring buffer which represent events which are 'in
249      * flight'.
250      */
251     public class RingEntry {
252
253         private final static int FREE = 1; // Slot in buffer is available to be written to.
254         private final static int PROCESSING = 2; // Slot in buffer represents an event which is waiting to be processed.
255         private final static int PUBLISHED = 3; // Slot in buffer represents an event which has been published.
256
257         /**
258          * Describes the state of this entry in the ring:
259          * <p>
260          * FREE = This slot is currently unused and may be written to.
261          * <p>
262          * PROCESSING = This slot describes an event which has not yet been published.
263          * <p>
264          * PUBLISHED = This lot describes an event which has been published and therefore may be released.
265          */
266         public AtomicInteger state = new AtomicInteger(FREE);
267
268         /** The unique identifier of the event which this entry represents. */
269         private String transactionId;
270
271         /** The event bus offset associated with the event which this entry represents. */
272         private long commitOffset;
273
274
275         /**
276          * Retrieve the transaction id for the event represented by this entry.
277          * 
278          * @return - Transaction id.
279          */
280         public String getTransactionId() {
281             return transactionId;
282         }
283
284
285         /**
286          * Assigns a transaction id to this entry.
287          * 
288          * @param transactionId - The unique id for this entry.
289          */
290         public void setTransactionId(String transactionId) {
291             this.transactionId = transactionId;
292         }
293
294
295         /**
296          * Retrieves the offset of the event represented by this entry.
297          * 
298          * @return - An event bus offset value.
299          */
300         public long getCommitOffset() {
301             return commitOffset;
302         }
303
304
305         /**
306          * Assigns an offset value to this entry.
307          * 
308          * @param commitOffset - Offset value for this entry.
309          */
310         public void setCommitOffset(long commitOffset) {
311             this.commitOffset = commitOffset;
312         }
313     }
314
315
316     /**
317      * This class implements a simple background task which wakes up periodically and determines the
318      * next available offset from the ring buffer which is safe to commit to the event bus.
319      */
320     private class OffsetCommitter implements Runnable {
321
322         /*
323          * (non-Javadoc)
324          * 
325          * @see java.lang.Runnable#run()
326          */
327         @Override
328         public void run() {
329
330             // Get the index into the ring buffer of the next slot to be checked.
331             int currentCommitIndex = (int) (commitPointer % bufferSize);
332
333             // If this entry is in the 'published' state then its offset is good to be
334             // committed.
335             while (ringBuffer[currentCommitIndex].state.get() == RingEntry.PUBLISHED) {
336
337                 // Grab the offset of the current entry.
338                 nextOffsetToCommit = ringBuffer[currentCommitIndex].getCommitOffset();
339
340                 // We don't need to keep the current entry alive any longer, so free it and advance
341                 // to the next entry in the ring.
342                 ringBuffer[currentCommitIndex].state.set(RingEntry.FREE);
343                 commitPointer++;
344
345                 // Update our index and loop back to check the next one. We will keep advancing
346                 // as long as we have consecutive entries that are flagged as 'published'.
347                 currentCommitIndex = (int) (commitPointer % bufferSize);
348             }
349
350             if (logger.isDebugEnabled()) {
351                 logger.debug("Offset to commit to event bus: "
352                         + ((nextOffsetToCommit != null) ? nextOffsetToCommit : "none"));
353             }
354         }
355     }
356 }