1 package evel_javalibrary.att.com;
\r
2 /**************************************************************************//**
\r
6 * This file implements internal Ringbuffer for storing and
\r
7 * forwarding events to Collector.
\r
11 * Unless otherwise specified, all software contained herein is
\r
12 * Licensed under the Apache License, Version 2.0 (the "License");
\r
13 * you may not use this file except in compliance with the License.
\r
14 * You may obtain a copy of the License at
\r
15 * http://www.apache.org/licenses/LICENSE-2.0
\r
17 * Unless required by applicable law or agreed to in writing, software
\r
18 * distributed under the License is distributed on an "AS IS" BASIS,
\r
19 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
20 * See the License for the specific language governing permissions and
\r
21 * limitations under the License.
\r
22 *****************************************************************************/
\r
24 import java.util.concurrent.Semaphore;
\r
26 * Ringbuffer to store and Forward http(s) POST requests
\r
28 public class RingBuffer {
\r
30 // message count semaphore
\r
31 public static Semaphore countsem;
\r
33 public static Semaphore spacesem;
\r
35 public static Semaphore lock;
\r
37 public Object[] elements = null;
\r
39 public int capacity = 0;
\r
40 public int writePos = 0;
\r
41 public int available = 0;
\r
44 * Constructs Ringbuffer of specified capacity
\r
46 public RingBuffer(int capacity) {
\r
47 this.capacity = capacity;
\r
48 this.elements = new Object[capacity];
\r
49 countsem = new Semaphore(1);
\r
50 spacesem = new Semaphore(capacity);
\r
51 lock = new Semaphore(1);
\r
54 //resets the positions
\r
55 public void reset() {
\r
60 //returns available capacity
\r
61 public int remainingCapacity() {
\r
62 return this.capacity - this.available;
\r
67 //Puts Java object into ringbuffer
\r
68 public boolean put(Object element){
\r
70 boolean ret = false;
\r
75 } catch (InterruptedException e) {
\r
76 // TODO Auto-generated catch block
\r
77 e.printStackTrace();
\r
82 if(available < capacity){
\r
83 if(writePos >= capacity){
\r
86 elements[writePos] = element;
\r
100 public int put(Object[] newElements){
\r
101 return put(newElements, newElements.length);
\r
104 public int put(Object[] newElements, int length){
\r
107 spacesem.acquire();
\r
109 } catch (InterruptedException e) {
\r
110 // TODO Auto-generated catch block
\r
111 e.printStackTrace();
\r
115 if(this.writePos > this.available){
\r
116 //space above writePos is all empty
\r
118 if(length <= this.capacity - this.writePos){
\r
119 //space above writePos is sufficient to insert batch
\r
121 for(; readPos < length; readPos++){
\r
122 this.elements[this.writePos++] = newElements[readPos];
\r
124 this.available += readPos;
\r
127 countsem.release();
\r
131 //both space above writePos and below writePos is necessary to use
\r
134 int lastEmptyPos = writePos - available;
\r
136 for(; this.writePos < this.capacity; this.writePos++){
\r
137 this.elements[this.writePos] = newElements[readPos++];
\r
140 //fill into bottom of array too.
\r
143 int endPos = Math.min(length - readPos, capacity - available - readPos);
\r
144 for(;this.writePos < endPos; this.writePos++){
\r
145 this.elements[this.writePos] = newElements[readPos++];
\r
147 this.available += readPos;
\r
150 countsem.release();
\r
154 int endPos = this.capacity - this.available + this.writePos;
\r
156 for(; this.writePos < endPos; this.writePos++){
\r
157 this.elements[this.writePos] = newElements[readPos++];
\r
159 this.available += readPos;
\r
162 countsem.release();
\r
170 * Takes a stored object in Ringbuffer and releases the space
\r
173 public Object take() {
\r
178 countsem.acquire();
\r
180 } catch (InterruptedException e) {
\r
181 // TODO Auto-generated catch block
\r
182 e.printStackTrace();
\r
185 if(available == 0){
\r
189 int nextSlot = writePos - available;
\r
191 nextSlot += capacity;
\r
193 nextObj = elements[nextSlot];
\r
198 spacesem.release();
\r
204 public int take(Object[] into){
\r
205 return take(into, into.length);
\r
209 public int take(Object[] into, int length){
\r
214 countsem.acquire();
\r
216 } catch (InterruptedException e) {
\r
217 // TODO Auto-generated catch block
\r
218 e.printStackTrace();
\r
221 if(available <= writePos){
\r
222 int nextPos= writePos - available;
\r
223 int endPos = nextPos + Math.min(available, length);
\r
225 for(;nextPos < endPos; nextPos++){
\r
226 into[intoPos++] = this.elements[nextPos];
\r
228 this.available -= intoPos;
\r
232 countsem.release();
\r
236 int nextPos = writePos - available + capacity;
\r
238 int leftInTop = capacity - nextPos;
\r
239 if(length <= leftInTop){
\r
241 for(; intoPos < length; intoPos++){
\r
242 into[intoPos] = this.elements[nextPos++];
\r
244 this.available -= length;
\r
247 countsem.release();
\r
252 for(; nextPos < capacity; nextPos++){
\r
253 into[intoPos++] = this.elements[nextPos];
\r
256 //copy bottom - from 0 to writePos
\r
258 int leftToCopy = length - intoPos;
\r
259 int endPos = Math.min(writePos, leftToCopy);
\r
261 for(;nextPos < endPos; nextPos++){
\r
262 into[intoPos++] = this.elements[nextPos];
\r
265 this.available -= intoPos;
\r
269 countsem.release();
\r