29import java.io.PrintStream;
30import java.lang.reflect.Array;
73 private final Object syncRead =
new Object();
74 private final Object syncWrite =
new Object();
75 private final Object syncGlobal =
new Object();
76 private volatile T[] array;
77 private volatile int capacityPlusOne;
78 private volatile int readPos;
79 private volatile int writePos;
80 private volatile int size;
84 return "LFRingbuffer<?>[filled "+size+
" / "+(capacityPlusOne-1)+
", writePos "+writePos+
", readPos "+readPos+
"]";
88 public final void dump(
final PrintStream stream,
final String prefix) {
89 stream.println(prefix+
" "+
toString()+
" {");
90 for(
int i=0; i<capacityPlusOne; i++) {
91 stream.println(
"\t["+i+
"]: "+array[i]);
116 @SuppressWarnings(
"unchecked")
117 public
LFRingbuffer(final T[] copyFrom) throws IllegalArgumentException {
118 capacityPlusOne = copyFrom.length + 1;
119 array = (T[]) newArray(copyFrom.getClass(), capacityPlusOne);
120 resetImpl(
true, copyFrom);
142 array = newArray(arrayType, capacityPlusOne);
143 resetImpl(
false,
null );
147 public final int capacity() {
return capacityPlusOne-1; }
151 synchronized ( syncGlobal ) {
152 resetImpl(
false,
null);
153 for(
int i=0; i<capacityPlusOne; i++) {
154 this.array[i] =
null;
160 public final void resetFull(
final T[] copyFrom)
throws IllegalArgumentException {
161 resetImpl(
true, copyFrom);
164 private final void resetImpl(
final boolean full,
final T[] copyFrom)
throws IllegalArgumentException {
165 synchronized ( syncGlobal ) {
166 if(
null != copyFrom ) {
167 if( copyFrom.length != capacityPlusOne-1 ) {
168 throw new IllegalArgumentException(
"copyFrom array length "+copyFrom.length+
" != capacity "+
this);
170 System.arraycopy(copyFrom, 0, array, 0, copyFrom.length);
171 array[capacityPlusOne-1] =
null;
173 throw new IllegalArgumentException(
"copyFrom array is null");
175 readPos = capacityPlusOne - 1;
177 writePos = readPos - 1;
178 size = capacityPlusOne - 1;
193 public final boolean isEmpty() {
return 0 == size; }
196 public final boolean isFull() {
return capacityPlusOne - 1 == size; }
205 public final T
get() {
207 return getImpl(
false,
false);
208 }
catch (
final InterruptedException ie) {
throw new RuntimeException(ie); }
219 return getImpl(
true,
false);
225 return getImpl(
false,
true);
226 }
catch (
final InterruptedException ie) {
throw new RuntimeException(ie); }
230 return getImpl(
true,
true);
233 private final T getImpl(
final boolean blocking,
final boolean peek)
throws InterruptedException {
234 int localReadPos = readPos;
235 if( localReadPos == writePos ) {
237 synchronized( syncRead ) {
238 while( localReadPos == writePos ) {
246 localReadPos = (localReadPos + 1) % capacityPlusOne;
247 final T r = array[localReadPos];
249 array[localReadPos] =
null;
250 synchronized ( syncWrite ) {
252 readPos = localReadPos;
253 syncWrite.notifyAll();
266 public final boolean put(
final T e) {
268 return putImpl(e,
false,
false);
269 }
catch (
final InterruptedException ie) {
throw new RuntimeException(ie); }
279 public final void putBlocking(
final T e)
throws InterruptedException {
280 if( !putImpl(e,
false,
true) ) {
281 throw new InternalError(
"Blocking put failed: "+
this);
292 public final boolean putSame(
final boolean blocking)
throws InterruptedException {
293 return putImpl(
null,
true, blocking);
296 private final boolean putImpl(
final T e,
final boolean sameRef,
final boolean blocking)
throws InterruptedException {
297 int localWritePos = writePos;
298 localWritePos = (localWritePos + 1) % capacityPlusOne;
299 if( localWritePos == readPos ) {
301 synchronized( syncWrite ) {
302 while( localWritePos == readPos ) {
311 array[localWritePos] = e;
313 synchronized ( syncRead ) {
315 writePos = localWritePos;
316 syncRead.notifyAll();
324 synchronized ( syncRead ) {
325 if( capacityPlusOne - 1 - size < count ) {
326 while( capacityPlusOne - 1 - size < count ) {
334 public final void growEmptyBuffer(
final T[] newElements)
throws IllegalStateException, IllegalArgumentException {
335 synchronized( syncGlobal ) {
336 if(
null == newElements ) {
337 throw new IllegalArgumentException(
"newElements is null");
339 @SuppressWarnings(
"unchecked")
340 final Class<? extends T[]> arrayTypeInternal = (Class<? extends T[]>) array.getClass();
341 @SuppressWarnings(
"unchecked")
342 final Class<? extends T[]> arrayTypeNew = (Class<? extends T[]>) newElements.getClass();
343 if( arrayTypeInternal != arrayTypeNew ) {
344 throw new IllegalArgumentException(
"newElements array-type mismatch, internal "+arrayTypeInternal+
", newElements "+arrayTypeNew);
347 throw new IllegalStateException(
"Buffer is not empty: "+
this);
349 if( readPos != writePos ) {
350 throw new InternalError(
"R/W pos not equal: "+
this);
352 if( readPos != writePos ) {
353 throw new InternalError(
"R/W pos not equal at empty: "+
this);
356 final int growAmount = newElements.length;
357 final int newCapacity = capacityPlusOne + growAmount;
358 final T[] oldArray = array;
359 final T[] newArray = newArray(arrayTypeInternal, newCapacity);
362 writePos += growAmount;
365 System.arraycopy(oldArray, 0, newArray, 0, readPos+1);
367 if( growAmount > 0 ) {
368 System.arraycopy(newElements, 0, newArray, readPos+1, growAmount);
370 final int tail = capacityPlusOne-1-readPos;
372 System.arraycopy(oldArray, readPos+1, newArray, writePos+1, tail);
376 capacityPlusOne = newCapacity;
382 public final void growFullBuffer(
final int growAmount)
throws IllegalStateException, IllegalArgumentException {
383 synchronized ( syncGlobal ) {
384 if( 0 > growAmount ) {
385 throw new IllegalArgumentException(
"amount "+growAmount+
" < 0 ");
387 if( capacityPlusOne-1 != size ) {
388 throw new IllegalStateException(
"Buffer is not full: "+
this);
390 final int wp1 = ( writePos + 1 ) % capacityPlusOne;
391 if( wp1 != readPos ) {
392 throw new InternalError(
"R != W+1 pos at full: "+
this);
394 @SuppressWarnings(
"unchecked")
395 final Class<? extends T[]> arrayTypeInternal = (Class<? extends T[]>) array.getClass();
397 final int newCapacity = capacityPlusOne + growAmount;
398 final T[] oldArray = array;
399 final T[] newArray = newArray(arrayTypeInternal, newCapacity);
402 readPos = ( writePos + 1 + growAmount ) % newCapacity;
405 System.arraycopy(oldArray, 0, newArray, 0, writePos+1);
407 final int tail = capacityPlusOne-1-writePos;
409 System.arraycopy(oldArray, writePos+1, newArray, readPos, tail);
412 capacityPlusOne = newCapacity;
417 @SuppressWarnings(
"unchecked")
418 private static <T> T[] newArray(final Class<? extends T[]> arrayType, final
int length) {
419 return ((Object)arrayType == (Object)Object[].
class)
420 ? (T[])
new Object[length]
421 : (T[]) Array.newInstance(arrayType.getComponentType(), length);
Simple implementation of Ringbuffer, exposing lock-free get*(..) and put*(..) methods.
final void clear()
Resets the read and write position according to an empty ring buffer and set all ring buffer slots to...
final void waitForFreeSlots(final int count)
Blocks until at least count free slots become available.
final boolean putSame(final boolean blocking)
Enqueues the same element at it's write position, if not full.Returns true if successful,...
final void resetFull(final T[] copyFrom)
Resets the read and write position according to a full ring buffer and fill all slots w/ elements of ...
LFRingbuffer(final Class<? extends T[]> arrayType, final int capacity)
Create an empty ring buffer instance w/ the given net capacity.
final boolean isFull()
Returns true if this ring buffer is full, otherwise false.
final boolean put(final T e)
Enqueues the given element.Returns true if successful, otherwise false in case buffer is full....
final boolean isEmpty()
Returns true if this ring buffer is empty, otherwise false.
final T getBlocking()
Dequeues the oldest enqueued element.The returned ring buffer slot will be set to null to release the...
final void putBlocking(final T e)
Enqueues the given element.Method blocks until a free slot becomes available via get.
final void growEmptyBuffer(final T[] newElements)
Grows an empty ring buffer, increasing it's capacity about the amount.
final int size()
Returns the number of elements in this ring buffer.
final int getFreeSlots()
Returns the number of free slots available to put.
final void dump(final PrintStream stream, final String prefix)
Debug functionality - Dumps the contents of the internal array.
final int capacity()
Returns the net capacity of this ring buffer.
final T peek()
Peeks the next element at the read position w/o modifying pointer, nor blocking.
final T peekBlocking()
Peeks the next element at the read position w/o modifying pointer, but w/ blocking.
final void growFullBuffer(final int growAmount)
Grows a full ring buffer, increasing it's capacity about the amount.
final String toString()
Returns a short string representation incl.
Ring buffer interface, a.k.a circular buffer.