26#ifndef JAU_RINGBUFFER_HPP_
27#define JAU_RINGBUFFER_HPP_
33#include <condition_variable>
50 #define _DEBUG_DUMP(...) { dump(stderr, __VA_ARGS__); }
51 #define _DEBUG_DUMP2(a, ...) { a.dump(stderr, __VA_ARGS__); }
52 #define _DEBUG_PRINT(...) { fprintf(stderr, __VA_ARGS__); }
54 #define _DEBUG_DUMP(...)
55 #define _DEBUG_DUMP2(a, ...)
56 #define _DEBUG_PRINT(...)
177template <
typename Value_type,
typename Size_type,
178 bool use_memmove = std::is_trivially_copyable_v<Value_type> || is_container_memmove_compliant_v<Value_type>,
179 bool use_memcpy = std::is_trivially_copyable_v<Value_type>,
180 bool use_secmem = is_enforcing_secmem_v<Value_type>
201 constexpr static const bool is_integral = std::is_integral_v<Value_type>;
203 typedef std::remove_const_t<Value_type> value_type_mutable;
205 typedef value_type_mutable* pointer_mutable;
207 static constexpr void* voidptr_cast(
const_pointer p) {
return reinterpret_cast<void*
>(
const_cast<pointer_mutable
>( p ) ); }
221 bool multi_pc_enabled =
false;
224 mutable std::mutex syncWrite, syncMultiWrite;
225 std::condition_variable cvWrite;
228 mutable std::mutex syncRead, syncMultiRead;
229 std::condition_variable cvRead;
237 Size_type capacityPlusOne;
239 sc_atomic_Size_type readPos;
240 sc_atomic_Size_type writePos;
242 constexpr Value_type * newArray(
const Size_type count)
noexcept {
247 ABORT(
"Error: bad_alloc: alloc %zu elements * %zu bytes/element = %zu bytes failed",
260 constexpr void freeArray(
Value_type ** a,
const Size_type count)
noexcept {
263 if(
nullptr != *a ) {
264 alloc_inst.deallocate(*a, count);
267 ABORT(
"ringbuffer::freeArray with nullptr");
271 constexpr void dtor_one(
const Size_type pos) {
274 ::explicit_bzero(voidptr_cast(array + pos),
sizeof(
value_type));
277 constexpr void dtor_one(
pointer elem) {
280 ::explicit_bzero(voidptr_cast(elem),
sizeof(
value_type));
284 Size_type waitForElementsImpl(
const Size_type min_count,
const fraction_i64& timeout,
bool& timeout_occurred)
noexcept {
285 timeout_occurred =
false;
286 Size_type available =
size();
287 if( available < min_count && min_count < capacityPlusOne && !end_of_input ) {
288 interrupted_read =
false;
289 std::unique_lock<std::mutex> lockWrite(syncWrite);
291 const fraction_timespec timeout_time =
getMonotonicTime() + fraction_timespec(timeout);
292 while( !interrupted_read && !end_of_input && min_count > available ) {
294 cvWrite.wait(lockWrite);
297 std::cv_status s =
wait_until(cvWrite, lockWrite, timeout_time );
299 if( std::cv_status::timeout == s && min_count > available ) {
300 timeout_occurred =
true;
305 if( interrupted_read ) {
306 interrupted_read =
false;
312 Size_type waitForFreeSlotsImpl(
const Size_type min_count,
const fraction_i64& timeout,
bool& timeout_occurred)
noexcept {
313 timeout_occurred =
false;
315 if( min_count > available && min_count < capacityPlusOne ) {
316 interrupted_write =
false;
317 std::unique_lock<std::mutex> lockRead(syncRead);
319 const fraction_timespec timeout_time =
getMonotonicTime() + fraction_timespec(timeout);
320 while( !interrupted_write && min_count > available ) {
322 cvRead.wait(lockRead);
325 std::cv_status s =
wait_until(cvRead, lockRead, timeout_time );
327 if( std::cv_status::timeout == s && min_count > available ) {
328 timeout_occurred =
true;
333 if( interrupted_write ) {
334 interrupted_write =
false;
345 constexpr void clearImpl() noexcept {
346 const Size_type size_ =
size();
348 if constexpr ( use_memcpy ) {
350 ::explicit_bzero(voidptr_cast(&array[0]), capacityPlusOne*
sizeof(
Value_type));
352 readPos = writePos.load();
354 Size_type localReadPos = readPos;
355 for(Size_type i=0; i<size_; i++) {
356 localReadPos = (localReadPos + 1) % capacityPlusOne;
359 if( writePos != localReadPos ) {
361 ABORT(
"copy segment error: this %s, readPos %d/%d; writePos %d",
toString().c_str(), readPos.load(), localReadPos, writePos.load());
364 ::explicit_bzero(voidptr_cast(&array[0]), capacityPlusOne*
sizeof(
Value_type));
366 readPos = localReadPos;
371 constexpr void clearAndZeroMemImpl() noexcept {
372 if constexpr ( use_memcpy ) {
373 ::explicit_bzero(voidptr_cast(&array[0]), capacityPlusOne*
sizeof(
Value_type));
374 readPos = writePos.load();
376 const Size_type size_ =
size();
377 Size_type localReadPos = readPos;
378 for(Size_type i=0; i<size_; i++) {
379 localReadPos = (localReadPos + 1) % capacityPlusOne;
382 if( writePos != localReadPos ) {
384 ABORT(
"copy segment error: this %s, readPos %d/%d; writePos %d",
toString().c_str(), readPos.load(), localReadPos, writePos.load());
386 ::explicit_bzero(voidptr_cast(&array[0]), capacityPlusOne*
sizeof(
Value_type));
387 readPos = localReadPos;
391 void cloneFrom(
const bool allocArrayAndCapacity,
const ringbuffer & source)
noexcept {
392 if( allocArrayAndCapacity ) {
393 if(
nullptr != array ) {
395 freeArray(&array, capacityPlusOne);
397 capacityPlusOne = source.capacityPlusOne;
398 array = newArray(capacityPlusOne);
399 }
else if( capacityPlusOne != source.capacityPlusOne ) {
400 ABORT( (
"capacityPlusOne not equal: this "+
toString()+
", source "+source.toString() ).c_str() );
405 readPos = source.readPos.load();
406 writePos = source.writePos.load();
409 ::memcpy(voidptr_cast(&array[0]),
413 const Size_type size_ =
size();
414 Size_type localWritePos = readPos;
415 for(Size_type i=0; i<size_; i++) {
416 localWritePos = (localWritePos + 1) % capacityPlusOne;
417 new (
const_cast<pointer_mutable
>(array + localWritePos))
value_type( source.array[localWritePos] );
419 if( writePos != localWritePos ) {
420 ABORT( (
"copy segment error: this "+
toString()+
", localWritePos "+
std::to_string(localWritePos)+
"; source "+source.toString()).c_str() );
426 if(
this == &_source ) {
430 if( capacityPlusOne != _source.capacityPlusOne ) {
431 cloneFrom(
true, _source);
433 cloneFrom(
false, _source);
440 void resetImpl(
const Value_type * copyFrom,
const Size_type copyFromCount)
noexcept {
442 if(
nullptr != copyFrom && 0 < copyFromCount ) {
443 if( copyFromCount > capacityPlusOne-1 ) {
445 if(
nullptr != array ) {
447 freeArray(&array, capacityPlusOne);
449 capacityPlusOne = copyFromCount + 1;
450 array = newArray(capacityPlusOne);
457 ::memcpy(voidptr_cast(&array[0]),
460 readPos = capacityPlusOne - 1;
461 writePos = copyFromCount - 1;
463 Size_type localWritePos = writePos;
464 for(Size_type i=0; i<copyFromCount; i++) {
465 localWritePos = (localWritePos + 1) % capacityPlusOne;
466 new (
const_cast<pointer_mutable
>(array + localWritePos))
value_type( copyFrom[i] );
468 writePos = localWritePos;
475 bool peekImpl(
Value_type& dest,
const bool blocking,
const fraction_i64& timeout,
bool& timeout_occurred)
noexcept {
476 timeout_occurred =
false;
477 if( !std::is_copy_constructible_v<Value_type> ) {
478 ABORT(
"Value_type is not copy constructible");
481 if( 1 >= capacityPlusOne ) {
484 const Size_type oldReadPos = readPos;
485 Size_type localReadPos = oldReadPos;
486 if( localReadPos == writePos ) {
487 if( blocking && !end_of_input ) {
488 interrupted_read =
false;
489 std::unique_lock<std::mutex> lockWrite(syncWrite);
490 const fraction_timespec timeout_time =
getMonotonicTime() + fraction_timespec(timeout);
491 while( !interrupted_read && !end_of_input && localReadPos == writePos ) {
493 cvWrite.wait(lockWrite);
495 std::cv_status s =
wait_until(cvWrite, lockWrite, timeout_time );
496 if( std::cv_status::timeout == s && localReadPos == writePos ) {
497 timeout_occurred =
true;
502 if( interrupted_read || end_of_input ) {
503 interrupted_read =
false;
504 if( localReadPos == writePos ) {
512 localReadPos = (localReadPos + 1) % capacityPlusOne;
515 ::memcpy(voidptr_cast(&dest),
516 &array[localReadPos],
519 dest = array[localReadPos];
521 readPos = oldReadPos;
525 bool moveOutImpl(
Value_type& dest,
const bool blocking,
const fraction_i64& timeout,
bool& timeout_occurred)
noexcept {
526 timeout_occurred =
false;
527 if( 1 >= capacityPlusOne ) {
530 const Size_type oldReadPos = readPos;
531 Size_type localReadPos = oldReadPos;
532 if( localReadPos == writePos ) {
533 if( blocking && !end_of_input ) {
534 interrupted_read =
false;
535 std::unique_lock<std::mutex> lockWrite(syncWrite);
536 const fraction_timespec timeout_time =
getMonotonicTime() + fraction_timespec(timeout);
537 while( !interrupted_read && !end_of_input && localReadPos == writePos ) {
539 cvWrite.wait(lockWrite);
541 std::cv_status s =
wait_until(cvWrite, lockWrite, timeout_time );
542 if( std::cv_status::timeout == s && localReadPos == writePos ) {
543 timeout_occurred =
true;
548 if( interrupted_read || end_of_input ) {
549 interrupted_read =
false;
550 if( localReadPos == writePos ) {
558 localReadPos = (localReadPos + 1) % capacityPlusOne;
559 if constexpr ( is_integral ) {
560 dest = array[localReadPos];
562 ::explicit_bzero(voidptr_cast(&array[localReadPos]),
sizeof(
Value_type));
566 ::memcpy(voidptr_cast(&dest),
567 &array[localReadPos],
570 ::explicit_bzero(voidptr_cast(&array[localReadPos]),
sizeof(
Value_type));
573 dest = std::move( array[localReadPos] );
574 dtor_one( localReadPos );
577 std::unique_lock<std::mutex> lockRead(syncRead);
578 readPos = localReadPos;
584 Size_type moveOutImpl(
Value_type *dest,
const Size_type dest_len,
const Size_type min_count_,
const bool blocking,
const fraction_i64& timeout,
bool& timeout_occurred)
noexcept {
585 timeout_occurred =
false;
586 const Size_type min_count =
std::min(dest_len, min_count_);
589 if( min_count >= capacityPlusOne ) {
592 if( 0 == min_count ) {
596 const Size_type oldReadPos = readPos;
597 Size_type localReadPos = oldReadPos;
598 Size_type available =
size();
599 if( min_count > available ) {
600 if( blocking && !end_of_input ) {
601 interrupted_read =
false;
602 std::unique_lock<std::mutex> lockWrite(syncWrite);
604 const fraction_timespec timeout_time =
getMonotonicTime() + fraction_timespec(timeout);
605 while( !interrupted_read && !end_of_input && min_count > available ) {
607 cvWrite.wait(lockWrite);
610 std::cv_status s =
wait_until(cvWrite, lockWrite, timeout_time );
612 if( std::cv_status::timeout == s && min_count > available ) {
613 timeout_occurred =
true;
618 if( interrupted_read || end_of_input ) {
619 interrupted_read =
false;
620 if( min_count > available ) {
628 const Size_type count =
std::min(dest_len, available);
637 Size_type togo_count = count;
638 const Size_type localWritePos = writePos;
639 if( localReadPos > localWritePos ) {
641 localReadPos = ( localReadPos + 1 ) % capacityPlusOne;
642 const Size_type tail_count =
std::min(togo_count, capacityPlusOne - localReadPos);
645 ::memcpy(voidptr_cast(iter_out),
646 &array[localReadPos],
649 ::explicit_bzero(voidptr_cast(&array[localReadPos]), tail_count*
sizeof(
Value_type));
652 for(Size_type i=0; i<tail_count; i++) {
653 iter_out[i] = std::move( array[localReadPos+i] );
654 dtor_one( localReadPos + i );
657 localReadPos = ( localReadPos + tail_count - 1 ) % capacityPlusOne;
658 togo_count -= tail_count;
659 iter_out += tail_count;
661 if( togo_count > 0 ) {
663 localReadPos = ( localReadPos + 1 ) % capacityPlusOne;
666 ::memcpy(voidptr_cast(iter_out),
667 &array[localReadPos],
670 ::explicit_bzero(voidptr_cast(&array[localReadPos]), togo_count*
sizeof(
Value_type));
673 for(Size_type i=0; i<togo_count; i++) {
674 iter_out[i] = std::move( array[localReadPos+i] );
675 dtor_one( localReadPos + i );
678 localReadPos = ( localReadPos + togo_count - 1 ) % capacityPlusOne;
681 std::unique_lock<std::mutex> lockRead(syncRead);
682 readPos = localReadPos;
688 Size_type dropImpl (Size_type count,
const bool blocking,
const fraction_i64& timeout,
bool& timeout_occurred)
noexcept {
689 timeout_occurred =
false;
690 if( count >= capacityPlusOne ) {
694 count = capacityPlusOne-1;
700 const Size_type oldReadPos = readPos;
701 Size_type localReadPos = oldReadPos;
702 Size_type available =
size();
703 if( count > available ) {
704 if( blocking && !end_of_input ) {
705 interrupted_read =
false;
706 std::unique_lock<std::mutex> lockWrite(syncWrite);
708 const fraction_timespec timeout_time =
getMonotonicTime() + fraction_timespec(timeout);
709 while( !interrupted_read && !end_of_input && count > available ) {
711 cvWrite.wait(lockWrite);
714 std::cv_status s =
wait_until(cvWrite, lockWrite, timeout_time );
716 if( std::cv_status::timeout == s && count > available ) {
717 timeout_occurred =
true;
722 if( interrupted_read || end_of_input ) {
723 interrupted_read =
false;
724 if( count > available ) {
739 Size_type togo_count = count;
740 const Size_type localWritePos = writePos;
741 if( localReadPos > localWritePos ) {
743 localReadPos = ( localReadPos + 1 ) % capacityPlusOne;
744 const Size_type tail_count =
std::min(togo_count, capacityPlusOne - localReadPos);
747 ::explicit_bzero(voidptr_cast(&array[localReadPos]), tail_count*
sizeof(
Value_type));
750 for(Size_type i=0; i<tail_count; i++) {
751 dtor_one( localReadPos+i );
754 localReadPos = ( localReadPos + tail_count - 1 ) % capacityPlusOne;
755 togo_count -= tail_count;
757 if( togo_count > 0 ) {
759 localReadPos = ( localReadPos + 1 ) % capacityPlusOne;
762 ::explicit_bzero(voidptr_cast(&array[localReadPos]), togo_count*
sizeof(
Value_type));
765 for(Size_type i=0; i<togo_count; i++) {
766 dtor_one( localReadPos+i );
769 localReadPos = ( localReadPos + togo_count - 1 ) % capacityPlusOne;
772 std::unique_lock<std::mutex> lockRead(syncRead);
773 readPos = localReadPos;
779 bool moveIntoImpl(
Value_type &&e,
const bool blocking,
const fraction_i64& timeout,
bool& timeout_occurred)
noexcept {
780 timeout_occurred =
false;
781 if( 1 >= capacityPlusOne ) {
784 Size_type localWritePos = writePos;
785 localWritePos = (localWritePos + 1) % capacityPlusOne;
786 if( localWritePos == readPos ) {
788 interrupted_write =
false;
789 std::unique_lock<std::mutex> lockRead(syncRead);
790 const fraction_timespec timeout_time =
getMonotonicTime() + fraction_timespec(timeout);
791 while( !interrupted_write && localWritePos == readPos ) {
793 cvRead.wait(lockRead);
795 std::cv_status s =
wait_until(cvRead, lockRead, timeout_time );
796 if( std::cv_status::timeout == s && localWritePos == readPos ) {
797 timeout_occurred =
true;
802 if( interrupted_write ) {
803 interrupted_write =
false;
810 if constexpr ( is_integral ) {
811 array[localWritePos] = e;
813 ::memcpy(voidptr_cast(&array[localWritePos]),
817 new (
const_cast<pointer_mutable
>(array + localWritePos))
value_type( std::move(e) );
820 std::unique_lock<std::mutex> lockWrite(syncWrite);
821 writePos = localWritePos;
823 cvWrite.notify_all();
827 bool copyIntoImpl(
const Value_type &e,
const bool blocking,
const fraction_i64& timeout,
bool& timeout_occurred)
noexcept {
828 timeout_occurred =
false;
829 if( !std::is_copy_constructible_v<Value_type> ) {
830 ABORT(
"Value_type is not copy constructible");
833 if( 1 >= capacityPlusOne ) {
836 Size_type localWritePos = writePos;
837 localWritePos = (localWritePos + 1) % capacityPlusOne;
838 if( localWritePos == readPos ) {
840 interrupted_write =
false;
841 std::unique_lock<std::mutex> lockRead(syncRead);
842 const fraction_timespec timeout_time =
getMonotonicTime() + fraction_timespec(timeout);
843 while( !interrupted_write && localWritePos == readPos ) {
845 cvRead.wait(lockRead);
847 std::cv_status s =
wait_until(cvRead, lockRead, timeout_time );
848 if( std::cv_status::timeout == s && localWritePos == readPos ) {
849 timeout_occurred =
true;
854 if( interrupted_write ) {
855 interrupted_write =
false;
862 if constexpr ( is_integral ) {
863 array[localWritePos] = e;
865 ::memcpy(voidptr_cast(&array[localWritePos]),
869 new (
const_cast<pointer_mutable
>(array + localWritePos))
value_type( e );
872 std::unique_lock<std::mutex> lockWrite(syncWrite);
873 writePos = localWritePos;
875 cvWrite.notify_all();
880 timeout_occurred =
false;
881 if( !std::is_copy_constructible_v<Value_type> ) {
882 ABORT(
"Value_type is not copy constructible");
886 const Size_type total_count = last - first;
888 if( total_count >= capacityPlusOne ) {
891 if( 0 == total_count ) {
895 Size_type localWritePos = writePos;
897 if( total_count > available ) {
899 interrupted_write =
false;
900 std::unique_lock<std::mutex> lockRead(syncRead);
902 const fraction_timespec timeout_time =
getMonotonicTime() + fraction_timespec(timeout);
903 while( !interrupted_write && total_count > available ) {
905 cvRead.wait(lockRead);
908 std::cv_status s =
wait_until(cvRead, lockRead, timeout_time );
910 if( std::cv_status::timeout == s && total_count > available ) {
911 timeout_occurred =
true;
916 if( interrupted_write ) {
917 interrupted_write =
false;
931 Size_type togo_count = total_count;
932 const Size_type localReadPos = readPos;
933 if( localWritePos >= localReadPos ) {
935 localWritePos = ( localWritePos + 1 ) % capacityPlusOne;
936 const Size_type tail_count =
std::min(togo_count, capacityPlusOne - localWritePos);
938 ::memcpy(voidptr_cast(&array[localWritePos]),
942 for(Size_type i=0; i<tail_count; i++) {
943 new (
const_cast<pointer_mutable
>(array + localWritePos + i))
value_type( iter_in[i] );
946 localWritePos = ( localWritePos + tail_count - 1 ) % capacityPlusOne;
947 togo_count -= tail_count;
948 iter_in += tail_count;
950 if( togo_count > 0 ) {
952 localWritePos = ( localWritePos + 1 ) % capacityPlusOne;
954 memcpy(voidptr_cast(&array[localWritePos]),
958 for(Size_type i=0; i<togo_count; i++) {
959 new (
const_cast<pointer_mutable
>(array + localWritePos + i))
value_type( iter_in[i] );
962 localWritePos = ( localWritePos + togo_count - 1 ) % capacityPlusOne;
965 std::unique_lock<std::mutex> lockWrite(syncWrite);
966 writePos = localWritePos;
968 cvWrite.notify_all();
972 void recapacityImpl(
const Size_type newCapacity) {
973 const Size_type size_ =
size();
975 if( capacityPlusOne == newCapacity+1 ) {
978 if( size_ > newCapacity ) {
983 const Size_type oldCapacityPlusOne = capacityPlusOne;
985 Size_type oldReadPos = readPos;
988 capacityPlusOne = newCapacity + 1;
989 array = newArray(capacityPlusOne);
994 if(
nullptr != oldArray && 0 < size_ ) {
995 Size_type localWritePos = writePos;
996 for(Size_type i=0; i<size_; i++) {
997 localWritePos = (localWritePos + 1) % capacityPlusOne;
998 oldReadPos = (oldReadPos + 1) % oldCapacityPlusOne;
999 new (
const_cast<pointer_mutable
>( array + localWritePos ))
value_type( std::move( oldArray[oldReadPos] ) );
1000 dtor_one( oldArray + oldReadPos );
1002 writePos = localWritePos;
1004 freeArray(&oldArray, oldCapacityPlusOne);
1007 void closeImpl(
const bool zeromem)
noexcept {
1009 clearAndZeroMemImpl();
1013 freeArray(&array, capacityPlusOne);
1015 capacityPlusOne = 1;
1016 array = newArray(capacityPlusOne);
1025 const std::string e_s =
isEmpty() ?
", empty" :
"";
1026 const std::string f_s =
isFull() ?
", full" :
"";
1033 void dump(FILE *stream,
const std::string& prefix)
const noexcept {
1034 fprintf(stream,
"%s %s, array %p\n", prefix.c_str(),
toString().c_str(), array);
1038 const std::string e_s =
isEmpty() ?
", empty" :
"";
1039 const std::string f_s =
isFull() ?
", full" :
"";
1043 ", "+e_s+f_s+mode_s+
", type[integral "+
std::to_string(is_integral)+
1044 ", trivialCpy "+
std::to_string(std::is_trivially_copyable_v<Value_type>)+
1073 : capacityPlusOne(copyFrom.size() + 1), array(newArray(capacityPlusOne)),
1074 readPos(0), writePos(0)
1076 resetImpl(copyFrom.data(), copyFrom.size());
1085 : capacityPlusOne(copyFromSize + 1), array(newArray(capacityPlusOne)),
1086 readPos(0), writePos(0)
1088 resetImpl(copyFrom, copyFromSize);
1110 : capacityPlusOne(
capacity + 1), array(newArray(capacityPlusOne)),
1111 readPos(0), writePos(0)
1118 if(
nullptr != array ) {
1120 freeArray(&array, capacityPlusOne);
1125 : capacityPlusOne(_source.capacityPlusOne), array(newArray(capacityPlusOne)),
1126 readPos(0), writePos(0)
1128 std::unique_lock<std::mutex> lockMultiReadS(_source.syncMultiRead, std::defer_lock);
1129 std::unique_lock<std::mutex> lockMultiWriteS(_source.syncMultiWrite, std::defer_lock);
1130 std::lock(lockMultiReadS, lockMultiWriteS);
1131 cloneFrom(
false, _source);
1137 if(
this == &_source ) {
1140 if( multi_pc_enabled ) {
1141 std::unique_lock<std::mutex> lockMultiReadS(_source.syncMultiRead, std::defer_lock);
1142 std::unique_lock<std::mutex> lockMultiWriteS(_source.syncMultiWrite, std::defer_lock);
1143 std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock);
1144 std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock);
1145 std::lock(lockMultiReadS, lockMultiWriteS, lockMultiRead, lockMultiWrite);
1147 return assignCopyImpl(_source);
1149 std::unique_lock<std::mutex> lockReadS(_source.syncRead, std::defer_lock);
1150 std::unique_lock<std::mutex> lockWriteS(_source.syncWrite, std::defer_lock);
1151 std::unique_lock<std::mutex> lockRead(syncRead, std::defer_lock);
1152 std::unique_lock<std::mutex> lockWrite(syncWrite, std::defer_lock);
1153 std::lock(lockReadS, lockWriteS, lockRead, lockWrite);
1155 return assignCopyImpl(_source);
1189 if( multi_pc_enabled ) {
1190 std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock);
1191 std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock);
1192 std::unique_lock<std::mutex> lockRead(syncRead, std::defer_lock);
1193 std::unique_lock<std::mutex> lockWrite(syncWrite, std::defer_lock);
1194 std::lock(lockMultiRead, lockMultiWrite, lockRead, lockWrite);
1198 std::unique_lock<std::mutex> lockRead(syncRead, std::defer_lock);
1199 std::unique_lock<std::mutex> lockWrite(syncWrite, std::defer_lock);
1200 std::lock(lockRead, lockWrite);
1207 Size_type
capacity() const noexcept {
return capacityPlusOne - 1; }
1213 bool isEmpty() const noexcept {
return writePos == readPos; }
1217 return ( writePos + 1 ) % capacityPlusOne == readPos;
1222 const Size_type R = readPos;
1223 const Size_type W = writePos;
1226 return W >= R ? W - R : capacityPlusOne - R + W;
1239 if( multi_pc_enabled ) {
1240 std::unique_lock<std::mutex> lockMultiRead(syncMultiRead);
1241 return waitForElementsImpl(min_count, timeout, timeout_occured);
1243 return waitForElementsImpl(min_count, timeout, timeout_occured);
1257 if( multi_pc_enabled ) {
1258 std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite);
1259 return waitForFreeSlotsImpl(min_count, timeout, timeout_occured);
1261 return waitForFreeSlotsImpl(min_count, timeout, timeout_occured);
1278 void clear(
const bool zeromem=
false) noexcept {
1279 if( multi_pc_enabled ) {
1280 std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock);
1281 std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock);
1282 std::unique_lock<std::mutex> lockRead(syncRead, std::defer_lock);
1283 std::unique_lock<std::mutex> lockWrite(syncWrite, std::defer_lock);
1284 std::lock(lockMultiRead, lockMultiWrite, lockRead, lockWrite);
1287 clearAndZeroMemImpl();
1292 std::unique_lock<std::mutex> lockRead(syncRead, std::defer_lock);
1293 std::unique_lock<std::mutex> lockWrite(syncWrite, std::defer_lock);
1294 std::lock(lockRead, lockWrite);
1297 clearAndZeroMemImpl();
1302 cvRead.notify_all();
1322 void close(
const bool zeromem=
false) noexcept {
1323 if( multi_pc_enabled ) {
1324 std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock);
1325 std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock);
1326 std::unique_lock<std::mutex> lockRead(syncRead, std::defer_lock);
1327 std::unique_lock<std::mutex> lockWrite(syncWrite, std::defer_lock);
1328 std::lock(lockMultiRead, lockMultiWrite, lockRead, lockWrite);
1331 interrupted_read =
true;
1332 interrupted_write =
true;
1333 end_of_input =
true;
1335 std::unique_lock<std::mutex> lockRead(syncRead, std::defer_lock);
1336 std::unique_lock<std::mutex> lockWrite(syncWrite, std::defer_lock);
1337 std::lock(lockRead, lockWrite);
1340 interrupted_write =
true;
1341 interrupted_read =
true;
1342 end_of_input =
true;
1345 cvRead.notify_all();
1346 cvWrite.notify_all();
1363 if( multi_pc_enabled ) {
1364 std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock);
1365 std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock);
1366 std::unique_lock<std::mutex> lockRead(syncRead, std::defer_lock);
1367 std::unique_lock<std::mutex> lockWrite(syncWrite, std::defer_lock);
1368 std::lock(lockMultiRead, lockMultiWrite, lockRead, lockWrite);
1370 resetImpl(copyFrom, copyFromCount);
1372 std::unique_lock<std::mutex> lockRead(syncRead, std::defer_lock);
1373 std::unique_lock<std::mutex> lockWrite(syncWrite, std::defer_lock);
1374 std::lock(lockRead, lockWrite);
1376 resetImpl(copyFrom, copyFromCount);
1393 void reset(
const std::vector<Value_type> & copyFrom)
noexcept {
1394 if( multi_pc_enabled ) {
1395 std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock);
1396 std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock);
1397 std::unique_lock<std::mutex> lockRead(syncRead, std::defer_lock);
1398 std::unique_lock<std::mutex> lockWrite(syncWrite, std::defer_lock);
1399 std::lock(lockMultiRead, lockMultiWrite, lockRead, lockWrite);
1401 resetImpl(copyFrom.data(), copyFrom.size());
1403 std::unique_lock<std::mutex> lockRead(syncRead, std::defer_lock);
1404 std::unique_lock<std::mutex> lockWrite(syncWrite, std::defer_lock);
1405 std::lock(lockRead, lockWrite);
1407 resetImpl(copyFrom.data(), copyFrom.size());
1427 if( multi_pc_enabled ) {
1428 std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock);
1429 std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock);
1430 std::unique_lock<std::mutex> lockRead(syncRead, std::defer_lock);
1431 std::unique_lock<std::mutex> lockWrite(syncWrite, std::defer_lock);
1432 std::lock(lockMultiRead, lockMultiWrite, lockRead, lockWrite);
1433 recapacityImpl(newCapacity);
1435 std::unique_lock<std::mutex> lockRead(syncRead, std::defer_lock);
1436 std::unique_lock<std::mutex> lockWrite(syncWrite, std::defer_lock);
1437 std::lock(lockRead, lockWrite);
1439 recapacityImpl(newCapacity);
1459 cvWrite.notify_all();
1479 bool timeout_occured_dummy;
1480 if( multi_pc_enabled ) {
1481 std::unique_lock<std::mutex> lockMultiRead(syncMultiRead);
1497 if( multi_pc_enabled ) {
1498 std::unique_lock<std::mutex> lockMultiRead(syncMultiRead);
1499 return peekImpl(result,
true, timeout, timeout_occurred);
1501 return peekImpl(result,
true, timeout, timeout_occurred);
1513 bool timeout_occurred;
1514 return peekBlocking(result, timeout, timeout_occurred);
1528 bool timeout_occured_dummy;
1529 if( multi_pc_enabled ) {
1530 std::unique_lock<std::mutex> lockMultiRead(syncMultiRead);
1548 if( multi_pc_enabled ) {
1549 std::unique_lock<std::mutex> lockMultiRead(syncMultiRead);
1550 return moveOutImpl(result,
true, timeout, timeout_occurred);
1552 return moveOutImpl(result,
true, timeout, timeout_occurred);
1566 bool timeout_occurred;
1567 return getBlocking(result, timeout, timeout_occurred);
1582 [[nodiscard]] Size_type
get(
Value_type *dest,
const Size_type dest_len,
const Size_type min_count)
noexcept {
1583 bool timeout_occured_dummy;
1584 if( multi_pc_enabled ) {
1585 std::unique_lock<std::mutex> lockMultiRead(syncMultiRead);
1586 return moveOutImpl(dest, dest_len, min_count,
false,
fractions_i64::zero, timeout_occured_dummy);
1588 return moveOutImpl(dest, dest_len, min_count,
false,
fractions_i64::zero, timeout_occured_dummy);
1605 if( multi_pc_enabled ) {
1606 std::unique_lock<std::mutex> lockMultiRead(syncMultiRead);
1607 return moveOutImpl(dest, dest_len, min_count,
true, timeout, timeout_occurred);
1609 return moveOutImpl(dest, dest_len, min_count,
true, timeout, timeout_occurred);
1625 bool timeout_occurred;
1626 return getBlocking(dest, dest_len, min_count, timeout, timeout_occurred);
1638 Size_type
drop(
const Size_type max_count)
noexcept {
1639 bool timeout_occured_dummy;
1640 if( multi_pc_enabled ) {
1641 std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock);
1642 std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock);
1643 std::lock(lockMultiRead, lockMultiWrite);
1663 if( multi_pc_enabled ) {
1664 std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock);
1665 std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock);
1666 std::lock(lockMultiRead, lockMultiWrite);
1667 return 0 != dropImpl(count,
true, timeout, timeout_occurred);
1669 return 0 != dropImpl(count,
true, timeout, timeout_occurred);
1685 bool timeout_occurred;
1700 bool timeout_occured_dummy;
1701 if( multi_pc_enabled ) {
1702 std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite);
1718 if( multi_pc_enabled ) {
1719 std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite);
1720 return moveIntoImpl(std::move(e),
true, timeout, timeout_occurred);
1722 return moveIntoImpl(std::move(e),
true, timeout, timeout_occurred);
1734 bool timeout_occurred;
1735 return putBlocking(std::move(e), timeout, timeout_occurred);
1748 bool timeout_occured_dummy;
1749 if( multi_pc_enabled ) {
1750 std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite);
1765 if( multi_pc_enabled ) {
1766 std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite);
1767 return copyIntoImpl(e,
true, timeout, timeout_occurred);
1769 return copyIntoImpl(e,
true, timeout, timeout_occurred);
1780 bool timeout_occurred;
1796 bool timeout_occured_dummy;
1797 if( multi_pc_enabled ) {
1798 std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite);
1815 if( multi_pc_enabled ) {
1816 std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite);
1817 return copyIntoImpl(first, last,
true, timeout, timeout_occurred);
1819 return copyIntoImpl(first, last,
true, timeout, timeout_occurred);
1832 bool timeout_occurred;
1833 return putBlocking(first, last, timeout, timeout_occurred);
Ring buffer implementation, a.k.a circular buffer, exposing lock-free get*(..) and put*(....
ringbuffer(const Value_type *copyFrom, const Size_type copyFromSize) noexcept
bool putBlocking(Value_type &&e, const fraction_i64 &timeout, bool &timeout_occurred) noexcept
Enqueues the given element by moving it into this ringbuffer storage.
bool peek(Value_type &result) noexcept
Peeks the next element at the read position w/o modifying pointer, nor blocking.
void interruptReader() noexcept
Interrupt a potentially blocked reader once.
bool peekBlocking(Value_type &result, const fraction_i64 &timeout, bool &timeout_occurred) noexcept
Peeks the next element at the read position w/o modifying pointer, but with blocking.
bool put(Value_type &&e) noexcept
Enqueues the given element by moving it into this ringbuffer storage.
ringbuffer(ringbuffer &&o) noexcept=default
static constexpr const bool uses_memmove
bool putBlocking(const Value_type *first, const Value_type *last, const fraction_i64 &timeout, bool &timeout_occurred) noexcept
Enqueues the given range of consecutive elementa by copying it into this ringbuffer storage.
bool put(const Value_type *first, const Value_type *last) noexcept
Enqueues the given range of consecutive elements by copying it into this ringbuffer storage.
bool getBlocking(Value_type &result, const fraction_i64 &timeout) noexcept
Dequeues the oldest enqueued element.
ringbuffer & operator=(ringbuffer &&o) noexcept=default
ringbuffer & operator=(const ringbuffer &_source) noexcept
bool get(Value_type &result) noexcept
Dequeues the oldest enqueued element, if available.
Size_type waitForElements(const Size_type min_count, const fraction_i64 &timeout, bool &timeout_occured) noexcept
Blocks until at least count elements have been put for subsequent get() and getBlocking().
std::make_signed< size_type >::type difference_type
constexpr bool getMultiPCEnabled() const
Return whether multiple producer and consumer are enabled, see ringbuffer_multi_pc and ringbuffer_sin...
bool putBlocking(const Value_type *first, const Value_type *last, const fraction_i64 &timeout) noexcept
Enqueues the given range of consecutive elementa by copying it into this ringbuffer storage.
Size_type size() const noexcept
Returns the number of elements in this ring buffer.
const value_type * const_pointer
bool getBlocking(Value_type &result, const fraction_i64 &timeout, bool &timeout_occurred) noexcept
Dequeues the oldest enqueued element.
bool peekBlocking(Value_type &result, const fraction_i64 &timeout) noexcept
Peeks the next element at the read position w/o modifying pointer, but with blocking.
void dump(FILE *stream, const std::string &prefix) const noexcept
Debug functionality - Dumps the contents of the internal array.
bool dropBlocking(const Size_type count, const fraction_i64 &timeout) noexcept
Drops exactly count oldest enqueued elements, will block until they become available.
void set_end_of_input(const bool v=true) noexcept
Set End of Input from writer thread, unblocking all read-operations and a potentially currently block...
Size_type waitForFreeSlots(const Size_type min_count, const fraction_i64 &timeout, bool &timeout_occured) noexcept
Blocks until at least count free slots become available for subsequent put() and putBlocking().
void close(const bool zeromem=false) noexcept
Close this ringbuffer by releasing all elements available and resizing capacity to zero.
Size_type get(Value_type *dest, const Size_type dest_len, const Size_type min_count) noexcept
Dequeues the oldest enqueued min(dest_len, size()>=min_count) elements by copying them into the given...
Size_type freeSlots() const noexcept
Returns the number of free slots available to put.
void clear(const bool zeromem=false) noexcept
Releasing all elements available, i.e.
void reset(const Value_type *copyFrom, const Size_type copyFromCount) noexcept
Clears all elements and add all copyFrom elements thereafter, as if reconstructing this ringbuffer in...
bool putBlocking(const Value_type &e, const fraction_i64 &timeout) noexcept
Enqueues the given element by copying it into this ringbuffer storage.
constexpr_non_literal_var void setMultiPCEnabled(const bool v)
Enable or disable capability to handle multiple producer and consumer, see ringbuffer_multi_pc and ri...
jau::callocator< Value_type > allocator_type
bool isFull() const noexcept
Returns true if this ring buffer is full, otherwise false.
bool put(const Value_type &e) noexcept
Enqueues the given element by copying it into this ringbuffer storage.
ringbuffer(const std::vector< Value_type > ©From) noexcept
Create a full ring buffer instance w/ the given array's net capacity and content.
bool isEmpty() const noexcept
Returns true if this ring buffer is empty, otherwise false.
std::string toString() const noexcept
Returns a short string representation incl.
Size_type capacity() const noexcept
Returns the net capacity of this ring buffer.
void recapacity(const Size_type newCapacity)
Resizes this ring buffer's capacity.
static constexpr const bool uses_secmem
ringbuffer(const ringbuffer &_source) noexcept
std::string get_info() const noexcept
static constexpr const bool uses_memcpy
const value_type & const_reference
void interruptWriter() noexcept
Interrupt a potentially blocked writer once.
ringbuffer(const Size_type capacity) noexcept
Create an empty ring buffer instance w/ the given net capacity.
Size_type drop(const Size_type max_count) noexcept
Drops up to max_count oldest enqueued elements, but may drop less if not available.
Size_type getBlocking(Value_type *dest, const Size_type dest_len, const Size_type min_count, const fraction_i64 &timeout, bool &timeout_occurred) noexcept
Dequeues the oldest enqueued min(dest_len, size()>=min_count) elements by copying them into the given...
Size_type getBlocking(Value_type *dest, const Size_type dest_len, const Size_type min_count, const fraction_i64 &timeout) noexcept
Dequeues the oldest enqueued min(dest_len, size()>=min_count) elements by copying them into the given...
void reset(const std::vector< Value_type > ©From) noexcept
Clears all elements and add all copyFrom elements thereafter, as if reconstructing this ringbuffer in...
bool putBlocking(const Value_type &e, const fraction_i64 &timeout, bool &timeout_occurred) noexcept
Enqueues the given element by copying it into this ringbuffer storage.
bool dropBlocking(const Size_type count, const fraction_i64 &timeout, bool &timeout_occurred) noexcept
Drops exactly count oldest enqueued elements, will block until they become available.
bool putBlocking(Value_type &&e, const fraction_i64 &timeout) noexcept
Enqueues the given element by moving it into this ringbuffer storage.
#define ABORT(...)
Use for unconditional ::abort() call with given messages, prefix '[elapsed_time] ABORT @ file:line fu...
std::string to_string(const alphabet &v) noexcept
#define constexpr_non_literal_var
Used when designed to declare a function constexpr, but prohibited by its specific implementation.
fraction_timespec getMonotonicTime() noexcept
Returns current monotonic time since Unix Epoch 00:00:00 UTC on 1970-01-01.
fraction< int64_t > fraction_i64
fraction using int64_t as integral type
constexpr T min(const T x, const T y) noexcept
Returns the minimum of two integrals (w/ branching) in O(1)
std::string to_hexstring(value_type const &v) noexcept
Produce a lower-case hexadecimal string representation of the given pointer.
constexpr const jau::fraction_i64 zero(0l, 1lu)
zero is 0/1
__pack(...): Produces MSVC, clang and gcc compatible lead-in and -out macros.
std::cv_status wait_until(std::condition_variable &cv, std::unique_lock< std::mutex > &lock, const fraction_timespec &absolute_time, const bool monotonic=true) noexcept
wait_until causes the current thread to block until the condition variable is notified,...
#define _DEBUG_DUMP2(a,...)
#define _DEBUG_PRINT(...)