2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 package org.onap.aai.spike.event.incoming;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.ScheduledExecutorService;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.AtomicInteger;
27 import java.util.concurrent.atomic.AtomicLong;
28 import org.onap.aai.cl.api.Logger;
29 import org.onap.aai.cl.eelf.LoggerFactory;
30 import org.onap.aai.spike.exception.SpikeException;
31 import org.onap.aai.spike.logging.SpikeMsgs;
35 * Instances of this class maintain a buffer of events which have been received and are queued up to
38 * A background thread advances a pointer into the buffer which always points to the head of the
39 * most recent consecutive block of processed events. This allows us, at any time, to know what
40 * offset value can be safely committed to the event store (meaning any events before that offset
41 * into the event topic will not be reprocessed on a restart).
43 public class OffsetManager {
45 /** Buffer that we use for caching 'in flight' events. */
46 private RingEntry[] ringBuffer;
48 /** Number of elements that can be stored in the buffer. */
49 private int bufferSize;
51 /** Pointer to the next free slot in the buffer. */
52 private AtomicLong writePointer = new AtomicLong(0L);
55 * Pointer to the next slot in the buffer to wait to be published so that we can commit its offset.
57 private long commitPointer = 0;
60 * Executor for scheduling the background task which commits offsets to the event bus.
62 private ScheduledExecutorService offsetCommitService = Executors.newScheduledThreadPool(1);
65 * The next offset value which represents the head of a consecutive block of events which have been
68 private Long nextOffsetToCommit = null;
70 private static Logger logger = LoggerFactory.getInstance().getLogger(OffsetManager.class.getName());
74 * Creates a new instance of the offset manager.
76 * @param bufferCapacity - The requested size of the buffer that we will use to cache offsets for
77 * events that are waiting to be processed.
78 * @param offsetCheckPeriodMs - The period at which we will try to update what we consider to be the
79 * next offset that can be safely committed to the event bus.
81 public OffsetManager(int bufferCapacity, long offsetCheckPeriodMs) {
83 // In order to make the math work nicely for our write and commit pointers, we
84 // need our buffer size to be a power of 2, so round the supplied buffer size
85 // up to ensure that it is a power of two.
87 // This way we can just keep incrementing our pointers forever without worrying
88 // about wrapping (we'll eventually roll over from LongMax to LongMin, but if the
89 // buffer size is a power of 2 then our modded physical indexes will still magically
90 // map to the next consecutive index. (Math!)
91 bufferSize = nextPowerOf2(bufferCapacity);
93 // Now, allocate and initialize our ring buffer.
94 ringBuffer = new RingEntry[bufferSize];
95 for (int i = 0; i < bufferSize; i++) {
96 ringBuffer[i] = new RingEntry();
99 // Schedule a task to commit the most recent offset value to the event library.
100 offsetCommitService.scheduleAtFixedRate(new OffsetCommitter(), offsetCheckPeriodMs, offsetCheckPeriodMs,
101 TimeUnit.MILLISECONDS);
103 logger.info(SpikeMsgs.OFFSET_MANAGER_STARTED, Integer.toString(bufferSize), Long.toString(offsetCheckPeriodMs));
108 * Logs an event with the offset manager.
110 * @param transactionId - The transaction id associated with this event.
111 * @param commitOffset - The event bus offset associated with this event.
113 * @return - The index into the offset manager's buffer for this event.
115 public int cacheEvent(String transactionId, long commitOffset) {
117 // Get the index to the next free slot in the ring...
118 int index = nextFreeSlot();
120 if (logger.isDebugEnabled()) {
121 logger.debug("Caching event with transaction-id: " + transactionId + " offset: " + commitOffset
122 + " to offset manager at index: " + index);
125 // ...and update it with the event meta data we want to cache.
126 ringBuffer[index].setTransactionId(transactionId);
127 ringBuffer[index].setCommitOffset(commitOffset);
134 * Marks a cached event as 'published'.
136 * @param anIndex - The index into the event cache that we want to update.
137 * @throws SpikeException
139 public void markAsPublished(int anIndex) throws SpikeException {
141 // Make sure that we were supplied a valid index.
142 if ((anIndex < 0) || (anIndex > bufferSize - 1)) {
143 throw new SpikeException("Invalid index " + anIndex + " for offset manager buffer.");
146 // It is only valid to mark a cell as 'Published' if it is already
147 // in the 'Processing' state.
148 if (!ringBuffer[anIndex].state.compareAndSet(RingEntry.PROCESSING, RingEntry.PUBLISHED)) {
149 throw new SpikeException("Unexpected event state: " + state2String(ringBuffer[anIndex].state.get()));
152 if (logger.isDebugEnabled()) {
153 logger.debug("Event in offset manger buffer at index: " + anIndex + " marked as 'published'");
159 * Marks a cached event as 'published'.
161 * @param transactionId - The transaction id of the event we want to update.
163 * @throws SpikeException
165 public void markAsPublished(String transactionId) throws SpikeException {
167 // Iterate over the ring buffer and try to find the specified transaction
169 for (int i = 0; i < bufferSize; i++) {
172 if (ringBuffer[i].getTransactionId() == transactionId) {
174 // Found the one we are looking for!
180 // If we made it here then we didn't find an event with the supplied transaction id.
181 throw new SpikeException("No event with transaction id: " + transactionId + " exists in offset manager buffer");
186 * Retrieves our current view of what is the most recent offset value that can be safely committed
187 * to the event bus (meaning that all events on the topic before that offset value have been
188 * processed and shouldn't be re-consumed after a restart).
190 * @return - The next 'safe' offset.
192 public Long getNextOffsetToCommit() {
193 return nextOffsetToCommit;
198 * Finds the next slot in the ring which is marked as 'free'.
200 * @return - An index into the ring buffer.
202 private int nextFreeSlot() {
204 int currentIndex = (int) (writePointer.getAndIncrement() % bufferSize);
205 while (!ringBuffer[currentIndex].state.compareAndSet(RingEntry.FREE, RingEntry.PROCESSING)) {
206 currentIndex = (int) (writePointer.getAndIncrement() % bufferSize);
214 * Given a number, this helper method finds the next largest number that is a power of 2.
216 * @param aNumber - The number to compute the next power of two for.
218 * @return - The next largest power of 2 for the supplied number.
220 private int nextPowerOf2(int aNumber) {
223 while (powerOfTwo < aNumber) {
229 private String state2String(int aState) {
235 case RingEntry.PROCESSING:
238 case RingEntry.PUBLISHED:
242 return "UNDEFINED(" + aState + ")";
248 * Defines the structure of the entries in the ring buffer which represent events which are 'in
251 public class RingEntry {
253 private final static int FREE = 1; // Slot in buffer is available to be written to.
254 private final static int PROCESSING = 2; // Slot in buffer represents an event which is waiting to be processed.
255 private final static int PUBLISHED = 3; // Slot in buffer represents an event which has been published.
258 * Describes the state of this entry in the ring:
260 * FREE = This slot is currently unused and may be written to.
262 * PROCESSING = This slot describes an event which has not yet been published.
264 * PUBLISHED = This lot describes an event which has been published and therefore may be released.
266 public AtomicInteger state = new AtomicInteger(FREE);
268 /** The unique identifier of the event which this entry represents. */
269 private String transactionId;
271 /** The event bus offset associated with the event which this entry represents. */
272 private long commitOffset;
276 * Retrieve the transaction id for the event represented by this entry.
278 * @return - Transaction id.
280 public String getTransactionId() {
281 return transactionId;
286 * Assigns a transaction id to this entry.
288 * @param transactionId - The unique id for this entry.
290 public void setTransactionId(String transactionId) {
291 this.transactionId = transactionId;
296 * Retrieves the offset of the event represented by this entry.
298 * @return - An event bus offset value.
300 public long getCommitOffset() {
306 * Assigns an offset value to this entry.
308 * @param commitOffset - Offset value for this entry.
310 public void setCommitOffset(long commitOffset) {
311 this.commitOffset = commitOffset;
317 * This class implements a simple background task which wakes up periodically and determines the
318 * next available offset from the ring buffer which is safe to commit to the event bus.
320 private class OffsetCommitter implements Runnable {
325 * @see java.lang.Runnable#run()
330 // Get the index into the ring buffer of the next slot to be checked.
331 int currentCommitIndex = (int) (commitPointer % bufferSize);
333 // If this entry is in the 'published' state then its offset is good to be
335 while (ringBuffer[currentCommitIndex].state.get() == RingEntry.PUBLISHED) {
337 // Grab the offset of the current entry.
338 nextOffsetToCommit = ringBuffer[currentCommitIndex].getCommitOffset();
340 // We don't need to keep the current entry alive any longer, so free it and advance
341 // to the next entry in the ring.
342 ringBuffer[currentCommitIndex].state.set(RingEntry.FREE);
345 // Update our index and loop back to check the next one. We will keep advancing
346 // as long as we have consecutive entries that are flagged as 'published'.
347 currentCommitIndex = (int) (commitPointer % bufferSize);
350 if (logger.isDebugEnabled()) {
351 logger.debug("Offset to commit to event bus: "
352 + ((nextOffsetToCommit != null) ? nextOffsetToCommit : "none"));