Direct-BT v3.3.0-1-gc2d430c
Direct-BT - Direct Bluetooth Programming.
ringbuffer.hpp
Go to the documentation of this file.
1/*
2 * Author: Sven Gothel <sgothel@jausoft.com>
3 * Copyright (c) 2020-2023 Gothel Software e.K.
4 * Copyright (c) 2020 ZAFENA AB
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining
7 * a copy of this software and associated documentation files (the
8 * "Software"), to deal in the Software without restriction, including
9 * without limitation the rights to use, copy, modify, merge, publish,
10 * distribute, sublicense, and/or sell copies of the Software, and to
11 * permit persons to whom the Software is furnished to do so, subject to
12 * the following conditions:
13 *
14 * The above copyright notice and this permission notice shall be
15 * included in all copies or substantial portions of the Software.
16 *
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24 */
25
26#ifndef JAU_RINGBUFFER_HPP_
27#define JAU_RINGBUFFER_HPP_
28
29#include <type_traits>
30#include <atomic>
31#include <memory>
32#include <mutex>
33#include <condition_variable>
34#include <chrono>
35#include <algorithm>
36
37#include <cstring>
38#include <string>
39#include <cstdint>
40
41#include <jau/debug.hpp>
42#include <jau/basic_types.hpp>
44#include <jau/fraction_type.hpp>
45#include <jau/callocator.hpp>
46
47namespace jau {
48
49#if 0
50 #define _DEBUG_DUMP(...) { dump(stderr, __VA_ARGS__); }
51 #define _DEBUG_DUMP2(a, ...) { a.dump(stderr, __VA_ARGS__); }
52 #define _DEBUG_PRINT(...) { fprintf(stderr, __VA_ARGS__); }
53#else
54 #define _DEBUG_DUMP(...)
55 #define _DEBUG_DUMP2(a, ...)
56 #define _DEBUG_PRINT(...)
57#endif
58
59/** @defgroup DataStructs Data Structures
60 * Data structures, notably
61 * - \ref ringbuffer
62 * - \ref darray
63 * - cow_darray
64 * - cow_vector
65 *
66 * @{
67 */
68
69/**
70 * Ring buffer implementation, a.k.a circular buffer,
71 * exposing <i>lock-free</i>
72 * {@link #get() get*(..)} and {@link #put(Object) put*(..)} methods.
73 *
74 * This data structure is also supporting \ref Concurrency.
75 *
76 * Implementation utilizes the <i>Always Keep One Slot Open</i>,
77 * hence implementation maintains an internal array of `capacity` <i>plus one</i>!
78 *
79 * ### Characteristics
80 * - Read position points to the last read element.
81 * - Write position points to the last written element.
82 *
83 * <table border="1">
84 * <tr><td>Empty</td><td>writePos == readPos</td><td>size == 0</td></tr>
85 * <tr><td>Full</td><td>writePos == readPos - 1</td><td>size == capacity</td></tr>
86 * </table>
87 * <pre>
88 * Empty [RW][][ ][ ][ ][ ][ ][ ] ; W==R
89 * Avail [ ][ ][R][.][.][.][.][W] ; W > R
90 * Avail [.][.][.][W][ ][ ][R][.] ; W < R - 1
91 * Full [.][.][.][.][.][W][R][.] ; W==R-1
92 * </pre>
93 *
94 *
95 * ### Thread Safety
96 * Thread safety is guaranteed, considering the mode of operation as described below.
97 *
98 * @anchor ringbuffer_single_pc
99 * #### One producer-thread and one consumer-thread
100 * Expects one producer-thread at a time and one consumer-thread at a time concurrently.
101 * Threads can be different or the same.
102 *
103 * This is the default mode with the least std::mutex operations.
104 * - Only blocking producer put() and consumer get() waiting for
105 * free slots or available data will utilize lock and wait for the corresponding operation.
106 * - Otherwise implementation is <i>lock-free</i> and relies on SC-DRF via atomic memory barriers.
107 *
108 * See setMultiPCEnabled().
109 *
110 * Implementation is thread safe if:
111 * - {@link #put() put*(..)} operations from one producer-thread at a time.
112 * - {@link #get() get*(..)} operations from one consumer-thread at a time.
113 * - {@link #put() put*(..)} producer and {@link #get() get*(..)} consumer threads can be different or the same.
114 *
115 * @anchor ringbuffer_multi_pc
116 * #### Multiple producer-threads and multiple consumer-threads
117 * Expects multiple producer-threads and multiple consumer-threads concurrently.
118 * Threads can be different or the same.
119 *
120 * This operation mode utilizes a specific multi-producer and -consumer lock,
121 * synchronizing {@link #put() put*(..)} and {@link #get() get*(..)} operations separately.
122 *
123 * Use setMultiPCEnabled() to enable or disable multiple producer and consumer mode.
124 *
125 * Implementation is thread safe if:
126 * - {@link #put() put*(..)} operations concurrently from multiple threads.
127 * - {@link #get() get*(..)} operations concurrently from multiple threads.
128 * - {@link #put() put*(..)} producer and {@link #get() get*(..)} consumer threads can be different or the same.
129 *
130 * #### Interruption of Consumer and Producer
131 * To allow an application to unblock a potentially blocked producer (writer)
132 * or consumer (reader) thread once,
133 * one can call interruptWriter() or interruptReader() respectively.
134 *
135 * #### Marking End of Input Stream (EOS)
136 * To allow an application to mark the end of input stream,
137 * i.e. the producer (write) has completed filling the ringbuffer,
138 * one can call set_end_of_input().
139 *
140 * Calling set_end_of_input(true) will unblock all read-operations from this point onwards.
141 * A potentially currently blocked reader thread is also interrupted and hence unblocked.
142 *
143 * #### See also
144 * - Sequentially Consistent (SC) ordering or SC-DRF (data race free) <https://en.cppreference.com/w/cpp/atomic/memory_order#Sequentially-consistent_ordering>
145 * - std::memory_order <https://en.cppreference.com/w/cpp/atomic/memory_order>
146 * - jau::sc_atomic_critical
147 * - setMultiPCEnabled()
148 * - interruptReader()
149 * - interruptWriter()
150 * - set_end_of_input()
151 *
152 * @anchor ringbuffer_ntt_params
153 * ### Non-Type Template Parameter (NTTP) controlling Value_type memory
154 * See @ref darray_ntt_params.
155 *
156 * #### use_memmove
157 * `use_memmove` see @ref darray_memmove.
158 *
159 * #### use_memcpy
160 * `use_memcpy` has more strict requirements than `use_memmove`,
161 * i.e. strictly relies on Value_type being `std::is_trivially_copyable_v<Value_type>`.
162 *
163 * It allows to merely use memory operations w/o the need for constructor or destructor.
164 *
165 * See [Trivial destructor](https://en.cppreference.com/w/cpp/language/destructor#Trivial_destructor)
166 * being key requirement to [TriviallyCopyable](https://en.cppreference.com/w/cpp/named_req/TriviallyCopyable).
167 * > A trivial destructor is a destructor that performs no action.
168 * > Objects with trivial destructors don't require a delete-expression and may be disposed of by simply deallocating their storage.
169 * > All data types compatible with the C language (POD types) are trivially destructible.`
170 *
171 * #### use_secmem
172 * `use_secmem` see @ref darray_secmem.
173 *
174 * @see @ref darray_ntt_params
175 * @see jau::sc_atomic_critical
176 */
177template <typename Value_type, typename Size_type,
178 bool use_memmove = std::is_trivially_copyable_v<Value_type> || is_container_memmove_compliant_v<Value_type>,
179 bool use_memcpy = std::is_trivially_copyable_v<Value_type>,
180 bool use_secmem = is_enforcing_secmem_v<Value_type>
181 >
183 public:
184 constexpr static const bool uses_memmove = use_memmove;
185 constexpr static const bool uses_memcpy = use_memcpy;
186 constexpr static const bool uses_secmem = use_secmem;
187
188 // typedefs' for C++ named requirements: Container (ex iterator)
189
192 typedef const value_type* const_pointer;
195 typedef Size_type size_type;
196 typedef typename std::make_signed<size_type>::type difference_type;
197
199
200 private:
201 constexpr static const bool is_integral = std::is_integral_v<Value_type>;
202
203 typedef std::remove_const_t<Value_type> value_type_mutable;
204 /** Required to create and move immutable elements, aka const */
205 typedef value_type_mutable* pointer_mutable;
206
207 static constexpr void* voidptr_cast(const_pointer p) { return reinterpret_cast<void*>( const_cast<pointer_mutable>( p ) ); }
208
209 /** SC atomic integral scalar jau::nsize_t. Memory-Model (MM) guaranteed sequential consistency (SC) between acquire (read) and release (write) */
211
212 /** Relaxed non-SC atomic integral scalar jau::nsize_t. Memory-Model (MM) only guarantees the atomic value, _no_ sequential consistency (SC) between acquire (read) and release (write). */
213 typedef ordered_atomic<Size_type, std::memory_order_relaxed> relaxed_atomic_Size_type;
214
215 /**
216 * Flagging whether multiple-producer and -consumer are enabled,
217 * see @ref ringbuffer_multi_pc and @ref ringbuffer_single_pc.
218 *
219 * Defaults to `false`.
220 */
221 bool multi_pc_enabled = false;
222
223 /** synchronizes write-operations (put*), i.e. modifying the writePos. */
224 mutable std::mutex syncWrite, syncMultiWrite; // Memory-Model (MM) guaranteed sequential consistency (SC) between acquire and release
225 std::condition_variable cvWrite;
226
227 /** synchronizes read-operations (get*), i.e. modifying the readPos. */
228 mutable std::mutex syncRead, syncMultiRead; // Memory-Model (MM) guaranteed sequential consistency (SC) between acquire and release
229 std::condition_variable cvRead;
230
231 jau::relaxed_atomic_bool interrupted_read = false;
232 jau::relaxed_atomic_bool interrupted_write = false;
233 jau::relaxed_atomic_bool end_of_input = false;
234
235 allocator_type alloc_inst;
236
237 /* const */ Size_type capacityPlusOne; // not final due to grow
238 /* const */ Value_type * array; // Synchronized due to MM's data-race-free SC (SC-DRF) between [atomic] acquire/release
239 sc_atomic_Size_type readPos; // Memory-Model (MM) guaranteed sequential consistency (SC) between acquire (read) and release (write)
240 sc_atomic_Size_type writePos; // ditto
241
242 constexpr Value_type * newArray(const Size_type count) noexcept {
243 if( 0 < count ) {
244 value_type * m = alloc_inst.allocate(count);
245 if( nullptr == m ) {
246 // Avoid exception, abort!
247 ABORT("Error: bad_alloc: alloc %zu elements * %zu bytes/element = %zu bytes failed",
248 count, sizeof(value_type), (count * sizeof(value_type)));
249 }
250 _DEBUG_DUMP("newArray ...");
251 _DEBUG_PRINT("newArray %" PRIu64 "\n", count);
252 return m;
253 } else {
254 _DEBUG_DUMP("newArray ...");
255 _DEBUG_PRINT("newArray %" PRIu64 "\n", count);
256 return nullptr;
257 }
258 }
259
260 constexpr void freeArray(Value_type ** a, const Size_type count) noexcept {
261 _DEBUG_DUMP("freeArray(def)");
262 _DEBUG_PRINT("freeArray %p\n", *a);
263 if( nullptr != *a ) {
264 alloc_inst.deallocate(*a, count);
265 *a = nullptr;
266 } else {
267 ABORT("ringbuffer::freeArray with nullptr");
268 }
269 }
270
271 constexpr void dtor_one(const Size_type pos) {
272 ( array + pos )->~value_type(); // placement new -> manual destruction!
273 if constexpr ( uses_secmem ) {
274 ::explicit_bzero(voidptr_cast(array + pos), sizeof(value_type));
275 }
276 }
277 constexpr void dtor_one(pointer elem) {
278 ( elem )->~value_type(); // placement new -> manual destruction!
279 if constexpr ( uses_secmem ) {
280 ::explicit_bzero(voidptr_cast(elem), sizeof(value_type));
281 }
282 }
283
284 Size_type waitForElementsImpl(const Size_type min_count, const fraction_i64& timeout, bool& timeout_occurred) noexcept {
285 timeout_occurred = false;
286 Size_type available = size();
287 if( available < min_count && min_count < capacityPlusOne && !end_of_input ) {
288 interrupted_read = false;
289 std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock
290 available = size();
291 const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout);
292 while( !interrupted_read && !end_of_input && min_count > available ) {
293 if( fractions_i64::zero == timeout ) {
294 cvWrite.wait(lockWrite);
295 available = size();
296 } else {
297 std::cv_status s = wait_until(cvWrite, lockWrite, timeout_time );
298 available = size();
299 if( std::cv_status::timeout == s && min_count > available ) {
300 timeout_occurred = true;
301 return available;
302 }
303 }
304 }
305 if( interrupted_read ) { // interruption or end_of_input may happen after delivering last data chunk
306 interrupted_read = false;
307 }
308 }
309 return available;
310 }
311
312 Size_type waitForFreeSlotsImpl(const Size_type min_count, const fraction_i64& timeout, bool& timeout_occurred) noexcept {
313 timeout_occurred = false;
314 Size_type available = freeSlots();
315 if( min_count > available && min_count < capacityPlusOne ) {
316 interrupted_write = false;
317 std::unique_lock<std::mutex> lockRead(syncRead); // SC-DRF w/ getImpl via same lock
318 available = freeSlots();
319 const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout);
320 while( !interrupted_write && min_count > available ) {
321 if( fractions_i64::zero == timeout ) {
322 cvRead.wait(lockRead);
323 available = freeSlots();
324 } else {
325 std::cv_status s = wait_until(cvRead, lockRead, timeout_time );
326 available = freeSlots();
327 if( std::cv_status::timeout == s && min_count > available ) {
328 timeout_occurred = true;
329 return available;
330 }
331 }
332 }
333 if( interrupted_write ) {
334 interrupted_write = false;
335 }
336 }
337 return available;
338 }
339
340 /**
341 * clear all elements, zero size.
342 *
343 * Moves readPos == writePos compatible with put*() when waiting for available
344 */
345 constexpr void clearImpl() noexcept {
346 const Size_type size_ = size();
347 if( 0 < size_ ) {
348 if constexpr ( use_memcpy ) {
349 if constexpr ( uses_secmem ) {
350 ::explicit_bzero(voidptr_cast(&array[0]), capacityPlusOne*sizeof(Value_type));
351 }
352 readPos = writePos.load();
353 } else {
354 Size_type localReadPos = readPos;
355 for(Size_type i=0; i<size_; i++) {
356 localReadPos = (localReadPos + 1) % capacityPlusOne; // NOLINT(clang-analyzer-core.DivideZero): always capacityPlusOne > 0
357 ( array + localReadPos )->~value_type(); // placement new -> manual destruction!
358 }
359 if( writePos != localReadPos ) {
360 // Avoid exception, abort!
361 ABORT("copy segment error: this %s, readPos %d/%d; writePos %d", toString().c_str(), readPos.load(), localReadPos, writePos.load());
362 }
363 if constexpr ( uses_secmem ) {
364 ::explicit_bzero(voidptr_cast(&array[0]), capacityPlusOne*sizeof(Value_type));
365 }
366 readPos = localReadPos;
367 }
368 }
369 }
370
371 constexpr void clearAndZeroMemImpl() noexcept {
372 if constexpr ( use_memcpy ) {
373 ::explicit_bzero(voidptr_cast(&array[0]), capacityPlusOne*sizeof(Value_type));
374 readPos = writePos.load();
375 } else {
376 const Size_type size_ = size();
377 Size_type localReadPos = readPos;
378 for(Size_type i=0; i<size_; i++) {
379 localReadPos = (localReadPos + 1) % capacityPlusOne; // NOLINT(clang-analyzer-core.DivideZero): always capacityPlusOne > 0
380 ( array + localReadPos )->~value_type(); // placement new -> manual destruction!
381 }
382 if( writePos != localReadPos ) {
383 // Avoid exception, abort!
384 ABORT("copy segment error: this %s, readPos %d/%d; writePos %d", toString().c_str(), readPos.load(), localReadPos, writePos.load());
385 }
386 ::explicit_bzero(voidptr_cast(&array[0]), capacityPlusOne*sizeof(Value_type));
387 readPos = localReadPos;
388 }
389 }
390
391 void cloneFrom(const bool allocArrayAndCapacity, const ringbuffer & source) noexcept {
392 if( allocArrayAndCapacity ) {
393 if( nullptr != array ) {
394 clearImpl();
395 freeArray(&array, capacityPlusOne);
396 }
397 capacityPlusOne = source.capacityPlusOne;
398 array = newArray(capacityPlusOne);
399 } else if( capacityPlusOne != source.capacityPlusOne ) {
400 ABORT( ("capacityPlusOne not equal: this "+toString()+", source "+source.toString() ).c_str() );
401 } else {
402 clearImpl();
403 }
404
405 readPos = source.readPos.load();
406 writePos = source.writePos.load();
407
408 if constexpr ( uses_memcpy ) {
409 ::memcpy(voidptr_cast(&array[0]),
410 &source.array[0],
411 capacityPlusOne*sizeof(Value_type));
412 } else {
413 const Size_type size_ = size();
414 Size_type localWritePos = readPos;
415 for(Size_type i=0; i<size_; i++) {
416 localWritePos = (localWritePos + 1) % capacityPlusOne; // NOLINT(clang-analyzer-core.DivideZero): always capacityPlusOne > 0
417 new (const_cast<pointer_mutable>(array + localWritePos)) value_type( source.array[localWritePos] ); // placement new
418 }
419 if( writePos != localWritePos ) {
420 ABORT( ("copy segment error: this "+toString()+", localWritePos "+std::to_string(localWritePos)+"; source "+source.toString()).c_str() );
421 }
422 }
423 }
424
425 ringbuffer& assignCopyImpl(const ringbuffer &_source) noexcept {
426 if( this == &_source ) {
427 return *this;
428 }
429
430 if( capacityPlusOne != _source.capacityPlusOne ) {
431 cloneFrom(true, _source);
432 } else {
433 cloneFrom(false, _source);
434 }
435 _DEBUG_DUMP("assignment(copy.this)");
436 _DEBUG_DUMP2(_source, "assignment(copy.source)");
437 return *this;
438 }
439
440 void resetImpl(const Value_type * copyFrom, const Size_type copyFromCount) noexcept {
441 // fill with copyFrom elements
442 if( nullptr != copyFrom && 0 < copyFromCount ) {
443 if( copyFromCount > capacityPlusOne-1 ) {
444 // new blank resized array
445 if( nullptr != array ) {
446 clearImpl();
447 freeArray(&array, capacityPlusOne);
448 }
449 capacityPlusOne = copyFromCount + 1;
450 array = newArray(capacityPlusOne);
451 readPos = 0;
452 writePos = 0;
453 } else {
454 clearImpl();
455 }
456 if constexpr ( uses_memcpy ) {
457 ::memcpy(voidptr_cast(&array[0]),
458 copyFrom,
459 copyFromCount*sizeof(Value_type));
460 readPos = capacityPlusOne - 1; // last read-pos
461 writePos = copyFromCount - 1; // last write-pos
462 } else {
463 Size_type localWritePos = writePos;
464 for(Size_type i=0; i<copyFromCount; i++) {
465 localWritePos = (localWritePos + 1) % capacityPlusOne; // NOLINT(clang-analyzer-core.DivideZero): always capacityPlusOne > 0
466 new (const_cast<pointer_mutable>(array + localWritePos)) value_type( copyFrom[i] ); // placement new
467 }
468 writePos = localWritePos;
469 }
470 } else {
471 clearImpl();
472 }
473 }
474
475 bool peekImpl(Value_type& dest, const bool blocking, const fraction_i64& timeout, bool& timeout_occurred) noexcept {
476 timeout_occurred = false;
477 if( !std::is_copy_constructible_v<Value_type> ) {
478 ABORT("Value_type is not copy constructible");
479 return false;
480 }
481 if( 1 >= capacityPlusOne ) {
482 return false;
483 }
484 const Size_type oldReadPos = readPos; // SC-DRF acquire atomic readPos, sync'ing with putImpl
485 Size_type localReadPos = oldReadPos;
486 if( localReadPos == writePos ) {
487 if( blocking && !end_of_input ) {
488 interrupted_read = false;
489 std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock
490 const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout);
491 while( !interrupted_read && !end_of_input && localReadPos == writePos ) {
492 if( fractions_i64::zero == timeout ) {
493 cvWrite.wait(lockWrite);
494 } else {
495 std::cv_status s = wait_until(cvWrite, lockWrite, timeout_time );
496 if( std::cv_status::timeout == s && localReadPos == writePos ) {
497 timeout_occurred = true;
498 return false;
499 }
500 }
501 }
502 if( interrupted_read || end_of_input ) {
503 interrupted_read = false;
504 if( localReadPos == writePos ) { // interruption or end_of_input may happen after delivering last data chunk
505 return false;
506 }
507 }
508 } else {
509 return false;
510 }
511 }
512 localReadPos = (localReadPos + 1) % capacityPlusOne; // NOLINT(clang-analyzer-core.DivideZero): always capacityPlusOne > 0
513 if constexpr ( !is_integral && uses_memmove ) {
514 // must not dtor after memcpy; memcpy OK, not overlapping
515 ::memcpy(voidptr_cast(&dest),
516 &array[localReadPos],
517 sizeof(Value_type));
518 } else {
519 dest = array[localReadPos];
520 }
521 readPos = oldReadPos; // SC-DRF release atomic readPos (complete acquire-release even @ peek)
522 return true;
523 }
524
525 bool moveOutImpl(Value_type& dest, const bool blocking, const fraction_i64& timeout, bool& timeout_occurred) noexcept {
526 timeout_occurred = false;
527 if( 1 >= capacityPlusOne ) {
528 return false;
529 }
530 const Size_type oldReadPos = readPos; // SC-DRF acquire atomic readPos, sync'ing with putImpl
531 Size_type localReadPos = oldReadPos;
532 if( localReadPos == writePos ) {
533 if( blocking && !end_of_input ) {
534 interrupted_read = false;
535 std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock
536 const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout);
537 while( !interrupted_read && !end_of_input && localReadPos == writePos ) {
538 if( fractions_i64::zero == timeout ) {
539 cvWrite.wait(lockWrite);
540 } else {
541 std::cv_status s = wait_until(cvWrite, lockWrite, timeout_time );
542 if( std::cv_status::timeout == s && localReadPos == writePos ) {
543 timeout_occurred = true;
544 return false;
545 }
546 }
547 }
548 if( interrupted_read || end_of_input ) {
549 interrupted_read = false;
550 if( localReadPos == writePos ) { // interruption or end_of_input may happen after delivering last data chunk
551 return false;
552 }
553 }
554 } else {
555 return false;
556 }
557 }
558 localReadPos = (localReadPos + 1) % capacityPlusOne; // NOLINT(clang-analyzer-core.DivideZero): always capacityPlusOne > 0
559 if constexpr ( is_integral ) {
560 dest = array[localReadPos];
561 if constexpr ( uses_secmem ) {
562 ::explicit_bzero(voidptr_cast(&array[localReadPos]), sizeof(Value_type));
563 }
564 } else if constexpr ( uses_memmove ) {
565 // must not dtor after memcpy; memcpy OK, not overlapping
566 ::memcpy(voidptr_cast(&dest),
567 &array[localReadPos],
568 sizeof(Value_type));
569 if constexpr ( uses_secmem ) {
570 ::explicit_bzero(voidptr_cast(&array[localReadPos]), sizeof(Value_type));
571 }
572 } else {
573 dest = std::move( array[localReadPos] );
574 dtor_one( localReadPos );
575 }
576 {
577 std::unique_lock<std::mutex> lockRead(syncRead); // SC-DRF w/ putImpl via same lock
578 readPos = localReadPos; // SC-DRF release atomic readPos
579 }
580 cvRead.notify_all(); // notify waiting putter, have mutex unlocked before notify_all to avoid pessimistic re-block of notified wait() thread.
581 return true;
582 }
583
584 Size_type moveOutImpl(Value_type *dest, const Size_type dest_len, const Size_type min_count_, const bool blocking, const fraction_i64& timeout, bool& timeout_occurred) noexcept {
585 timeout_occurred = false;
586 const Size_type min_count = std::min(dest_len, min_count_);
587 Value_type *iter_out = dest;
588
589 if( min_count >= capacityPlusOne ) {
590 return 0;
591 }
592 if( 0 == min_count ) {
593 return 0;
594 }
595
596 const Size_type oldReadPos = readPos; // SC-DRF acquire atomic readPos, sync'ing with putImpl
597 Size_type localReadPos = oldReadPos;
598 Size_type available = size();
599 if( min_count > available ) {
600 if( blocking && !end_of_input ) {
601 interrupted_read = false;
602 std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock
603 available = size();
604 const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout);
605 while( !interrupted_read && !end_of_input && min_count > available ) {
606 if( fractions_i64::zero == timeout ) {
607 cvWrite.wait(lockWrite);
608 available = size();
609 } else {
610 std::cv_status s = wait_until(cvWrite, lockWrite, timeout_time );
611 available = size();
612 if( std::cv_status::timeout == s && min_count > available ) {
613 timeout_occurred = true;
614 return 0;
615 }
616 }
617 }
618 if( interrupted_read || end_of_input ) {
619 interrupted_read = false;
620 if( min_count > available ) { // interruption or end_of_input may happen after delivering last data chunk
621 return 0;
622 }
623 }
624 } else {
625 return 0;
626 }
627 }
628 const Size_type count = std::min(dest_len, available);
629
630 /**
631 * Empty [RW][][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ] ; W==R
632 * Avail [ ][ ][R][.][.][.][.][W][ ][ ][ ][ ][ ][ ][ ] ; W > R
633 * Avail [.][.][.][W][ ][ ][R][.][.][.][.][.][.][.][.] ; W < R - 1
634 * Full [.][.][.][.][.][W][R][.][.][.][.][.][.][.][.] ; W==R-1
635 */
636 // Since available > 0, we can exclude Empty case.
637 Size_type togo_count = count;
638 const Size_type localWritePos = writePos;
639 if( localReadPos > localWritePos ) {
640 // we have a tail
641 localReadPos = ( localReadPos + 1 ) % capacityPlusOne; // next-read-pos // NOLINT(clang-analyzer-core.DivideZero): always capacityPlusOne > 0
642 const Size_type tail_count = std::min(togo_count, capacityPlusOne - localReadPos);
643 if constexpr ( uses_memmove ) {
644 // must not dtor after memcpy; memcpy OK, not overlapping
645 ::memcpy(voidptr_cast(iter_out),
646 &array[localReadPos],
647 tail_count*sizeof(Value_type));
648 if constexpr ( uses_secmem ) {
649 ::explicit_bzero(voidptr_cast(&array[localReadPos]), tail_count*sizeof(Value_type));
650 }
651 } else {
652 for(Size_type i=0; i<tail_count; i++) {
653 iter_out[i] = std::move( array[localReadPos+i] );
654 dtor_one( localReadPos + i ); // manual destruction, even after std::move (object still exists)
655 }
656 }
657 localReadPos = ( localReadPos + tail_count - 1 ) % capacityPlusOne; // last read-pos // NOLINT(clang-analyzer-core.DivideZero): always capacityPlusOne > 0
658 togo_count -= tail_count;
659 iter_out += tail_count;
660 }
661 if( togo_count > 0 ) {
662 // we have a head
663 localReadPos = ( localReadPos + 1 ) % capacityPlusOne; // next-read-pos // NOLINT(clang-analyzer-core.DivideZero): always capacityPlusOne > 0
664 if constexpr ( uses_memmove ) {
665 // must not dtor after memcpy; memcpy OK, not overlapping
666 ::memcpy(voidptr_cast(iter_out),
667 &array[localReadPos],
668 togo_count*sizeof(Value_type));
669 if constexpr ( uses_secmem ) {
670 ::explicit_bzero(voidptr_cast(&array[localReadPos]), togo_count*sizeof(Value_type));
671 }
672 } else {
673 for(Size_type i=0; i<togo_count; i++) {
674 iter_out[i] = std::move( array[localReadPos+i] );
675 dtor_one( localReadPos + i ); // manual destruction, even after std::move (object still exists)
676 }
677 }
678 localReadPos = ( localReadPos + togo_count - 1 ) % capacityPlusOne; // last read-pos // NOLINT(clang-analyzer-core.DivideZero): always capacityPlusOne > 0
679 }
680 {
681 std::unique_lock<std::mutex> lockRead(syncRead); // SC-DRF w/ putImpl via same lock
682 readPos = localReadPos; // SC-DRF release atomic readPos
683 }
684 cvRead.notify_all(); // notify waiting putter, have mutex unlocked before notify_all to avoid pessimistic re-block of notified wait() thread.
685 return count;
686 }
687
688 Size_type dropImpl (Size_type count, const bool blocking, const fraction_i64& timeout, bool& timeout_occurred) noexcept {
689 timeout_occurred = false;
690 if( count >= capacityPlusOne ) {
691 if( blocking ) {
692 return 0;
693 }
694 count = capacityPlusOne-1; // claim theoretical maximum for non-blocking
695 }
696 if( 0 == count ) {
697 return 0;
698 }
699
700 const Size_type oldReadPos = readPos; // SC-DRF acquire atomic readPos, sync'ing with putImpl
701 Size_type localReadPos = oldReadPos;
702 Size_type available = size();
703 if( count > available ) {
704 if( blocking && !end_of_input ) {
705 interrupted_read = false;
706 std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock
707 available = size();
708 const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout);
709 while( !interrupted_read && !end_of_input && count > available ) {
710 if( fractions_i64::zero == timeout ) {
711 cvWrite.wait(lockWrite);
712 available = size();
713 } else {
714 std::cv_status s = wait_until(cvWrite, lockWrite, timeout_time );
715 available = size();
716 if( std::cv_status::timeout == s && count > available ) {
717 timeout_occurred = true;
718 return 0;
719 }
720 }
721 }
722 if( interrupted_read || end_of_input ) {
723 interrupted_read = false;
724 if( count > available ) { // interruption or end_of_input may happen after delivering last data chunk
725 return 0;
726 }
727 }
728 } else {
729 count = available; // drop all available for non-blocking
730 }
731 }
732 /**
733 * Empty [RW][][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ] ; W==R
734 * Avail [ ][ ][R][.][.][.][.][W][ ][ ][ ][ ][ ][ ][ ] ; W > R
735 * Avail [.][.][.][W][ ][ ][R][.][.][.][.][.][.][.][.] ; W < R - 1
736 * Full [.][.][.][.][.][W][R][.][.][.][.][.][.][.][.] ; W==R-1
737 */
738 // Since available > 0, we can exclude Empty case.
739 Size_type togo_count = count;
740 const Size_type localWritePos = writePos;
741 if( localReadPos > localWritePos ) {
742 // we have a tail
743 localReadPos = ( localReadPos + 1 ) % capacityPlusOne; // next-read-pos // NOLINT(clang-analyzer-core.DivideZero): always capacityPlusOne > 0
744 const Size_type tail_count = std::min(togo_count, capacityPlusOne - localReadPos);
745 if constexpr ( uses_memcpy ) {
746 if constexpr ( uses_secmem ) {
747 ::explicit_bzero(voidptr_cast(&array[localReadPos]), tail_count*sizeof(Value_type));
748 }
749 } else {
750 for(Size_type i=0; i<tail_count; i++) {
751 dtor_one( localReadPos+i );
752 }
753 }
754 localReadPos = ( localReadPos + tail_count - 1 ) % capacityPlusOne; // last read-pos // NOLINT(clang-analyzer-core.DivideZero): always capacityPlusOne > 0
755 togo_count -= tail_count;
756 }
757 if( togo_count > 0 ) {
758 // we have a head
759 localReadPos = ( localReadPos + 1 ) % capacityPlusOne; // next-read-pos // NOLINT(clang-analyzer-core.DivideZero): always capacityPlusOne > 0
760 if constexpr ( uses_memcpy ) {
761 if constexpr ( uses_secmem ) {
762 ::explicit_bzero(voidptr_cast(&array[localReadPos]), togo_count*sizeof(Value_type));
763 }
764 } else {
765 for(Size_type i=0; i<togo_count; i++) {
766 dtor_one( localReadPos+i );
767 }
768 }
769 localReadPos = ( localReadPos + togo_count - 1 ) % capacityPlusOne; // last read-pos // NOLINT(clang-analyzer-core.DivideZero): always capacityPlusOne > 0
770 }
771 {
772 std::unique_lock<std::mutex> lockRead(syncRead); // SC-DRF w/ putImpl via same lock
773 readPos = localReadPos; // SC-DRF release atomic readPos
774 }
775 cvRead.notify_all(); // notify waiting putter, have mutex unlocked before notify_all to avoid pessimistic re-block of notified wait() thread.
776 return count;
777 }
778
779 bool moveIntoImpl(Value_type &&e, const bool blocking, const fraction_i64& timeout, bool& timeout_occurred) noexcept {
780 timeout_occurred = false;
781 if( 1 >= capacityPlusOne ) {
782 return false;
783 }
784 Size_type localWritePos = writePos; // SC-DRF acquire atomic writePos, sync'ing with getImpl
785 localWritePos = (localWritePos + 1) % capacityPlusOne; // NOLINT(clang-analyzer-core.DivideZero): always capacityPlusOne > 0
786 if( localWritePos == readPos ) {
787 if( blocking ) {
788 interrupted_write = false;
789 std::unique_lock<std::mutex> lockRead(syncRead); // SC-DRF w/ getImpl via same lock
790 const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout);
791 while( !interrupted_write && localWritePos == readPos ) {
792 if( fractions_i64::zero == timeout ) {
793 cvRead.wait(lockRead);
794 } else {
795 std::cv_status s = wait_until(cvRead, lockRead, timeout_time );
796 if( std::cv_status::timeout == s && localWritePos == readPos ) {
797 timeout_occurred = true;
798 return false;
799 }
800 }
801 }
802 if( interrupted_write ) {
803 interrupted_write = false;
804 return false;
805 }
806 } else {
807 return false;
808 }
809 }
810 if constexpr ( is_integral ) {
811 array[localWritePos] = e;
812 } else if constexpr ( uses_memcpy ) {
813 ::memcpy(voidptr_cast(&array[localWritePos]),
814 &e,
815 sizeof(Value_type));
816 } else {
817 new (const_cast<pointer_mutable>(array + localWritePos)) value_type( std::move(e) ); // placement new
818 }
819 {
820 std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ getImpl via same lock
821 writePos = localWritePos; // SC-DRF release atomic writePos
822 }
823 cvWrite.notify_all(); // notify waiting getter, have mutex unlocked before notify_all to avoid pessimistic re-block of notified wait() thread.
824 return true;
825 }
826
827 bool copyIntoImpl(const Value_type &e, const bool blocking, const fraction_i64& timeout, bool& timeout_occurred) noexcept {
828 timeout_occurred = false;
829 if( !std::is_copy_constructible_v<Value_type> ) {
830 ABORT("Value_type is not copy constructible");
831 return false;
832 }
833 if( 1 >= capacityPlusOne ) {
834 return false;
835 }
836 Size_type localWritePos = writePos; // SC-DRF acquire atomic writePos, sync'ing with getImpl
837 localWritePos = (localWritePos + 1) % capacityPlusOne; // NOLINT(clang-analyzer-core.DivideZero): always capacityPlusOne > 0
838 if( localWritePos == readPos ) {
839 if( blocking ) {
840 interrupted_write = false;
841 std::unique_lock<std::mutex> lockRead(syncRead); // SC-DRF w/ getImpl via same lock
842 const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout);
843 while( !interrupted_write && localWritePos == readPos ) {
844 if( fractions_i64::zero == timeout ) {
845 cvRead.wait(lockRead);
846 } else {
847 std::cv_status s = wait_until(cvRead, lockRead, timeout_time );
848 if( std::cv_status::timeout == s && localWritePos == readPos ) {
849 timeout_occurred = true;
850 return false;
851 }
852 }
853 }
854 if( interrupted_write ) {
855 interrupted_write = false;
856 return false;
857 }
858 } else {
859 return false;
860 }
861 }
862 if constexpr ( is_integral ) {
863 array[localWritePos] = e;
864 } else if constexpr ( uses_memcpy ) {
865 ::memcpy(voidptr_cast(&array[localWritePos]),
866 &e,
867 sizeof(Value_type));
868 } else {
869 new (const_cast<pointer_mutable>(array + localWritePos)) value_type( e ); // placement new
870 }
871 {
872 std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ getImpl via same lock
873 writePos = localWritePos; // SC-DRF release atomic writePos
874 }
875 cvWrite.notify_all(); // notify waiting getter, have mutex unlocked before notify_all to avoid pessimistic re-block of notified wait() thread.
876 return true;
877 }
878
879 bool copyIntoImpl(const Value_type *first, const Value_type* last, const bool blocking, const fraction_i64& timeout, bool& timeout_occurred) noexcept {
880 timeout_occurred = false;
881 if( !std::is_copy_constructible_v<Value_type> ) {
882 ABORT("Value_type is not copy constructible");
883 return false;
884 }
885 const Value_type *iter_in = first;
886 const Size_type total_count = last - first;
887
888 if( total_count >= capacityPlusOne ) {
889 return false;
890 }
891 if( 0 == total_count ) {
892 return true;
893 }
894
895 Size_type localWritePos = writePos; // SC-DRF acquire atomic writePos, sync'ing with getImpl
896 Size_type available = freeSlots();
897 if( total_count > available ) {
898 if( blocking ) {
899 interrupted_write = false;
900 std::unique_lock<std::mutex> lockRead(syncRead); // SC-DRF w/ getImpl via same lock
901 available = freeSlots();
902 const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout);
903 while( !interrupted_write && total_count > available ) {
904 if( fractions_i64::zero == timeout ) {
905 cvRead.wait(lockRead);
906 available = freeSlots();
907 } else {
908 std::cv_status s = wait_until(cvRead, lockRead, timeout_time );
909 available = freeSlots();
910 if( std::cv_status::timeout == s && total_count > available ) {
911 timeout_occurred = true;
912 return false;
913 }
914 }
915 }
916 if( interrupted_write ) {
917 interrupted_write = false;
918 return false;
919 }
920 } else {
921 return false;
922 }
923 }
924 /**
925 * Empty [RW][][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ] ; W==R
926 * Avail [ ][ ][R][.][.][.][.][W][ ][ ][ ][ ][ ][ ][ ] ; W > R
927 * Avail [.][.][.][W][ ][ ][R][.][.][.][.][.][.][.][.] ; W < R - 1
928 * Full [.][.][.][.][.][W][R][.][.][.][.][.][.][.][.] ; W==R-1
929 */
930 // Since available > 0, we can exclude Full case.
931 Size_type togo_count = total_count;
932 const Size_type localReadPos = readPos;
933 if( localWritePos >= localReadPos ) { // Empty at any position or W > R case
934 // we have a tail
935 localWritePos = ( localWritePos + 1 ) % capacityPlusOne; // next-write-pos // NOLINT(clang-analyzer-core.DivideZero): always capacityPlusOne > 0
936 const Size_type tail_count = std::min(togo_count, capacityPlusOne - localWritePos);
937 if constexpr ( uses_memcpy ) {
938 ::memcpy(voidptr_cast(&array[localWritePos]),
939 iter_in,
940 tail_count*sizeof(Value_type));
941 } else {
942 for(Size_type i=0; i<tail_count; i++) {
943 new (const_cast<pointer_mutable>(array + localWritePos + i)) value_type( iter_in[i] ); // placement new
944 }
945 }
946 localWritePos = ( localWritePos + tail_count - 1 ) % capacityPlusOne; // last write-pos // NOLINT(clang-analyzer-core.DivideZero): always capacityPlusOne > 0
947 togo_count -= tail_count;
948 iter_in += tail_count;
949 }
950 if( togo_count > 0 ) {
951 // we have a head
952 localWritePos = ( localWritePos + 1 ) % capacityPlusOne; // next-write-pos // NOLINT(clang-analyzer-core.DivideZero): always capacityPlusOne > 0
953 if constexpr ( uses_memcpy ) {
954 memcpy(voidptr_cast(&array[localWritePos]),
955 iter_in,
956 togo_count*sizeof(Value_type));
957 } else {
958 for(Size_type i=0; i<togo_count; i++) {
959 new (const_cast<pointer_mutable>(array + localWritePos + i)) value_type( iter_in[i] ); // placement new
960 }
961 }
962 localWritePos = ( localWritePos + togo_count - 1 ) % capacityPlusOne; // last write-pos // NOLINT(clang-analyzer-core.DivideZero): always capacityPlusOne > 0
963 }
964 {
965 std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ getImpl via same lock
966 writePos = localWritePos; // SC-DRF release atomic writePos
967 }
968 cvWrite.notify_all(); // notify waiting getter, have mutex unlocked before notify_all to avoid pessimistic re-block of notified wait() thread.
969 return true;
970 }
971
972 void recapacityImpl(const Size_type newCapacity) {
973 const Size_type size_ = size();
974
975 if( capacityPlusOne == newCapacity+1 ) {
976 return;
977 }
978 if( size_ > newCapacity ) {
979 throw IllegalArgumentException("amount "+std::to_string(newCapacity)+" < size, "+toString(), E_FILE_LINE);
980 }
981
982 // save current data
983 const Size_type oldCapacityPlusOne = capacityPlusOne;
984 Value_type * oldArray = array;
985 Size_type oldReadPos = readPos;
986
987 // new blank resized array, starting at position 0
988 capacityPlusOne = newCapacity + 1;
989 array = newArray(capacityPlusOne);
990 readPos = 0;
991 writePos = 0;
992
993 // copy saved data
994 if( nullptr != oldArray && 0 < size_ ) {
995 Size_type localWritePos = writePos;
996 for(Size_type i=0; i<size_; i++) {
997 localWritePos = (localWritePos + 1) % capacityPlusOne; // NOLINT(clang-analyzer-core.DivideZero): always capacityPlusOne > 0
998 oldReadPos = (oldReadPos + 1) % oldCapacityPlusOne;
999 new (const_cast<pointer_mutable>( array + localWritePos )) value_type( std::move( oldArray[oldReadPos] ) ); // placement new
1000 dtor_one( oldArray + oldReadPos ); // manual destruction, even after std::move (object still exists)
1001 }
1002 writePos = localWritePos;
1003 }
1004 freeArray(&oldArray, oldCapacityPlusOne); // and release
1005 }
1006
1007 void closeImpl(const bool zeromem) noexcept {
1008 if( zeromem ) {
1009 clearAndZeroMemImpl();
1010 } else {
1011 clearImpl();
1012 }
1013 freeArray(&array, capacityPlusOne);
1014
1015 capacityPlusOne = 1;
1016 array = newArray(capacityPlusOne);
1017 readPos = 0;
1018 writePos = 0;
1019 }
1020
1021 public:
1022
1023 /** Returns a short string representation incl. size/capacity and internal r/w index (impl. dependent). */
1024 std::string toString() const noexcept {
1025 const std::string e_s = isEmpty() ? ", empty" : "";
1026 const std::string f_s = isFull() ? ", full" : "";
1027 const std::string mode_s = getMultiPCEnabled() ? ", mpc" : ", one";
1028 return "ringbuffer<?>[size "+std::to_string(size())+" / "+std::to_string(capacityPlusOne-1)+
1029 ", writePos "+std::to_string(writePos)+", readPos "+std::to_string(readPos)+e_s+f_s+mode_s+"]";
1030 }
1031
1032 /** Debug functionality - Dumps the contents of the internal array. */
1033 void dump(FILE *stream, const std::string& prefix) const noexcept {
1034 fprintf(stream, "%s %s, array %p\n", prefix.c_str(), toString().c_str(), array);
1035 }
1036
1037 std::string get_info() const noexcept {
1038 const std::string e_s = isEmpty() ? ", empty" : "";
1039 const std::string f_s = isFull() ? ", full" : "";
1040 const std::string mode_s = getMultiPCEnabled() ? ", mpc" : ", one";
1041 std::string res("ringbuffer<?>[this "+jau::to_hexstring(this)+
1042 ", size "+std::to_string(size())+" / "+std::to_string(capacityPlusOne-1)+
1043 ", "+e_s+f_s+mode_s+", type[integral "+std::to_string(is_integral)+
1044 ", trivialCpy "+std::to_string(std::is_trivially_copyable_v<Value_type>)+
1045 "], uses[mmove "+std::to_string(uses_memmove)+
1046 ", mcpy "+std::to_string(uses_memcpy)+
1047 ", smem "+std::to_string(uses_secmem)+
1048 "]]");
1049 return res;
1050 }
1051
1052 /**
1053 * Create a full ring buffer instance w/ the given array's net capacity and content.
1054 * <p>
1055 * Example for a 10 element Integer array:
1056 * <pre>
1057 * Integer[] source = new Integer[10];
1058 * // fill source with content ..
1059 * ringbuffer<Integer> rb = new ringbuffer<Integer>(source);
1060 * </pre>
1061 * </p>
1062 * <p>
1063 * {@link #isFull()} returns true on the newly created full ring buffer.
1064 * </p>
1065 * <p>
1066 * Implementation will allocate an internal array with size of array `copyFrom` <i>plus one</i>,
1067 * and copy all elements from array `copyFrom` into the internal array.
1068 * </p>
1069 * @param copyFrom mandatory source array determining ring buffer's net {@link #capacity()} and initial content.
1070 * @throws IllegalArgumentException if `copyFrom` is `nullptr`
1071 */
1072 ringbuffer(const std::vector<Value_type> & copyFrom) noexcept
1073 : capacityPlusOne(copyFrom.size() + 1), array(newArray(capacityPlusOne)),
1074 readPos(0), writePos(0)
1075 {
1076 resetImpl(copyFrom.data(), copyFrom.size());
1077 _DEBUG_DUMP("ctor(vector<Value_type>)");
1078 }
1079
1080 /**
1081 * @param copyFrom
1082 * @param copyFromSize
1083 */
1084 ringbuffer(const Value_type * copyFrom, const Size_type copyFromSize) noexcept
1085 : capacityPlusOne(copyFromSize + 1), array(newArray(capacityPlusOne)),
1086 readPos(0), writePos(0)
1087 {
1088 resetImpl(copyFrom, copyFromSize);
1089 _DEBUG_DUMP("ctor(Value_type*, len)");
1090 }
1091
1092 /**
1093 * Create an empty ring buffer instance w/ the given net `capacity`.
1094 * <p>
1095 * Example for a 10 element Integer array:
1096 * <pre>
1097 * ringbuffer<Integer> rb = new ringbuffer<Integer>(10, Integer[].class);
1098 * </pre>
1099 * </p>
1100 * <p>
1101 * {@link #isEmpty()} returns true on the newly created empty ring buffer.
1102 * </p>
1103 * <p>
1104 * Implementation will allocate an internal array of size `capacity` <i>plus one</i>.
1105 * </p>
1106 * @param arrayType the array type of the created empty internal array.
1107 * @param capacity the initial net capacity of the ring buffer
1108 */
1109 ringbuffer(const Size_type capacity) noexcept
1110 : capacityPlusOne(capacity + 1), array(newArray(capacityPlusOne)),
1111 readPos(0), writePos(0)
1112 {
1113 _DEBUG_DUMP("ctor(capacity)");
1114 }
1115
1116 ~ringbuffer() noexcept {
1117 _DEBUG_DUMP("dtor(def)");
1118 if( nullptr != array ) {
1119 clearImpl();
1120 freeArray(&array, capacityPlusOne);
1121 }
1122 }
1123
1124 ringbuffer(const ringbuffer &_source) noexcept
1125 : capacityPlusOne(_source.capacityPlusOne), array(newArray(capacityPlusOne)),
1126 readPos(0), writePos(0)
1127 {
1128 std::unique_lock<std::mutex> lockMultiReadS(_source.syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops
1129 std::unique_lock<std::mutex> lockMultiWriteS(_source.syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor
1130 std::lock(lockMultiReadS, lockMultiWriteS); // *this instance does not exist yet
1131 cloneFrom(false, _source);
1132 _DEBUG_DUMP("ctor(copy.this)");
1133 _DEBUG_DUMP2(_source, "ctor(copy.source)");
1134 }
1135
1136 ringbuffer& operator=(const ringbuffer &_source) noexcept {
1137 if( this == &_source ) {
1138 return *this;
1139 }
1140 if( multi_pc_enabled ) {
1141 std::unique_lock<std::mutex> lockMultiReadS(_source.syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops
1142 std::unique_lock<std::mutex> lockMultiWriteS(_source.syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor
1143 std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock); // same for *this instance!
1144 std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock);
1145 std::lock(lockMultiReadS, lockMultiWriteS, lockMultiRead, lockMultiWrite);
1146
1147 return assignCopyImpl(_source);
1148 } else {
1149 std::unique_lock<std::mutex> lockReadS(_source.syncRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops
1150 std::unique_lock<std::mutex> lockWriteS(_source.syncWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor
1151 std::unique_lock<std::mutex> lockRead(syncRead, std::defer_lock); // same for *this instance!
1152 std::unique_lock<std::mutex> lockWrite(syncWrite, std::defer_lock);
1153 std::lock(lockReadS, lockWriteS, lockRead, lockWrite);
1154
1155 return assignCopyImpl(_source);
1156 }
1157 }
1158
1159 ringbuffer(ringbuffer &&o) noexcept = default;
1160 ringbuffer& operator=(ringbuffer &&o) noexcept = default;
1161
1162 /**
1163 * Return whether multiple producer and consumer are enabled,
1164 * see @ref ringbuffer_multi_pc and @ref ringbuffer_single_pc.
1165 *
1166 * Defaults to `false`.
1167 * @see @ref ringbuffer_multi_pc
1168 * @see @ref ringbuffer_single_pc
1169 * @see getMultiPCEnabled()
1170 * @see setMultiPCEnabled()
1171 */
1172 constexpr bool getMultiPCEnabled() const { return multi_pc_enabled; }
1173
1174 /**
1175 * Enable or disable capability to handle multiple producer and consumer,
1176 * see @ref ringbuffer_multi_pc and @ref ringbuffer_single_pc.
1177 *
1178 * Defaults to `false`.
1179 * @see @ref ringbuffer_multi_pc
1180 * @see @ref ringbuffer_single_pc
1181 * @see getMultiPCEnabled()
1182 * @see setMultiPCEnabled()
1183 */
1185 /**
1186 * Using just 'constexpr_non_literal_var' because
1187 * clang: 'unique_lock<std::mutex>' is not literal because it is not an aggregate and has no constexpr constructors other than copy or move constructors
1188 */
1189 if( multi_pc_enabled ) {
1190 std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock); // same for *this instance!
1191 std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock);
1192 std::unique_lock<std::mutex> lockRead(syncRead, std::defer_lock); // same for *this instance!
1193 std::unique_lock<std::mutex> lockWrite(syncWrite, std::defer_lock);
1194 std::lock(lockMultiRead, lockMultiWrite, lockRead, lockWrite);
1195
1196 multi_pc_enabled=v;
1197 } else {
1198 std::unique_lock<std::mutex> lockRead(syncRead, std::defer_lock); // same for *this instance!
1199 std::unique_lock<std::mutex> lockWrite(syncWrite, std::defer_lock);
1200 std::lock(lockRead, lockWrite);
1201
1202 multi_pc_enabled=v;
1203 }
1204 }
1205
1206 /** Returns the net capacity of this ring buffer. */
1207 Size_type capacity() const noexcept { return capacityPlusOne - 1; }
1208
1209 /** Returns the number of free slots available to put. */
1210 Size_type freeSlots() const noexcept { return capacityPlusOne - 1 - size(); }
1211
1212 /** Returns true if this ring buffer is empty, otherwise false. */
1213 bool isEmpty() const noexcept { return writePos == readPos; /* 0 == size */ }
1214
1215 /** Returns true if this ring buffer is full, otherwise false. */
1216 bool isFull() const noexcept {
1217 return ( writePos + 1 ) % capacityPlusOne == readPos; /* W == R - 1 */ // NOLINT(clang-analyzer-core.DivideZero): always capacityPlusOne > 0
1218 }
1219
1220 /** Returns the number of elements in this ring buffer. */
1221 Size_type size() const noexcept {
1222 const Size_type R = readPos;
1223 const Size_type W = writePos;
1224 // W >= R: W - R
1225 // W < R: C+1 - R - 1 + W + 1 = C+1 - R + W
1226 return W >= R ? W - R : capacityPlusOne - R + W;
1227 }
1228
1229 /**
1230 * Blocks until at least `count` elements have been put
1231 * for subsequent get() and getBlocking().
1232 *
1233 * @param min_count minimum number of put slots
1234 * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely
1235 * @param timeout_occurred result value set to true if a timeout has occurred
1236 * @return the number of put elements, available for get() and getBlocking()
1237 */
1238 [[nodiscard]] Size_type waitForElements(const Size_type min_count, const fraction_i64& timeout, bool& timeout_occured) noexcept {
1239 if( multi_pc_enabled ) {
1240 std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl
1241 return waitForElementsImpl(min_count, timeout, timeout_occured);
1242 } else {
1243 return waitForElementsImpl(min_count, timeout, timeout_occured);
1244 }
1245 }
1246
1247 /**
1248 * Blocks until at least `count` free slots become available
1249 * for subsequent put() and putBlocking().
1250 *
1251 * @param min_count minimum number of free slots
1252 * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely
1253 * @param timeout_occurred result value set to true if a timeout has occurred
1254 * @return the number of free slots, available for put() and putBlocking()
1255 */
1256 [[nodiscard]] Size_type waitForFreeSlots(const Size_type min_count, const fraction_i64& timeout, bool& timeout_occured) noexcept {
1257 if( multi_pc_enabled ) {
1258 std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiWrite, _not_ sync'ing w/ getImpl
1259 return waitForFreeSlotsImpl(min_count, timeout, timeout_occured);
1260 } else {
1261 return waitForFreeSlotsImpl(min_count, timeout, timeout_occured);
1262 }
1263 }
1264
1265 /**
1266 * Releasing all elements available, i.e. size() at the time of the call
1267 *
1268 * It is the user's obligation to ensure thread safety when using @ref ringbuffer_single_pc,
1269 * as implementation can only synchronize on the blocked put() and get() std::mutex.
1270 *
1271 * Assuming no concurrent put() operation, after the call:
1272 * - {@link #isEmpty()} will return `true`
1273 * - {@link #size()} shall return `0`
1274 *
1275 * @param zeromem pass true to zero ringbuffer memory after releasing elements, otherwise non template type parameter use_secmem determines the behavior (default), see @ref ringbuffer_ntt_params.
1276 * @see @ref ringbuffer_ntt_params
1277 */
1278 void clear(const bool zeromem=false) noexcept {
1279 if( multi_pc_enabled ) {
1280 std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock); // same for *this instance!
1281 std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock);
1282 std::unique_lock<std::mutex> lockRead(syncRead, std::defer_lock); // same for *this instance!
1283 std::unique_lock<std::mutex> lockWrite(syncWrite, std::defer_lock);
1284 std::lock(lockMultiRead, lockMultiWrite, lockRead, lockWrite);
1285
1286 if( zeromem ) {
1287 clearAndZeroMemImpl();
1288 } else {
1289 clearImpl();
1290 }
1291 } else {
1292 std::unique_lock<std::mutex> lockRead(syncRead, std::defer_lock); // same for *this instance!
1293 std::unique_lock<std::mutex> lockWrite(syncWrite, std::defer_lock);
1294 std::lock(lockRead, lockWrite);
1295
1296 if( zeromem ) {
1297 clearAndZeroMemImpl();
1298 } else {
1299 clearImpl();
1300 }
1301 }
1302 cvRead.notify_all(); // notify waiting writer, have mutex unlocked before notify_all to avoid pessimistic re-block of notified wait() thread.
1303 }
1304
1305 /**
1306 * Close this ringbuffer by releasing all elements available
1307 * and resizing capacity to zero.
1308 * Essentially causing all read and write operations to fail.
1309 *
1310 * Potential writer and reader thread will be
1311 * - notified, allowing them to be woken up
1312 * - interrupted to abort the write and read operation
1313 *
1314 * Subsequent write and read operations will fail and not block.
1315 *
1316 * After the call:
1317 * - {@link #isEmpty()} will return `true`
1318 * - {@link #size()} shall return `0`
1319 * - {@link #capacity()} shall return `0`
1320 * - {@link #freeSlots()} shall return `0`
1321 */
1322 void close(const bool zeromem=false) noexcept {
1323 if( multi_pc_enabled ) {
1324 std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock); // same for *this instance!
1325 std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock);
1326 std::unique_lock<std::mutex> lockRead(syncRead, std::defer_lock); // same for *this instance!
1327 std::unique_lock<std::mutex> lockWrite(syncWrite, std::defer_lock);
1328 std::lock(lockMultiRead, lockMultiWrite, lockRead, lockWrite);
1329
1330 closeImpl(zeromem);
1331 interrupted_read = true;
1332 interrupted_write = true;
1333 end_of_input = true;
1334 } else {
1335 std::unique_lock<std::mutex> lockRead(syncRead, std::defer_lock); // same for *this instance!
1336 std::unique_lock<std::mutex> lockWrite(syncWrite, std::defer_lock);
1337 std::lock(lockRead, lockWrite);
1338
1339 closeImpl(zeromem);
1340 interrupted_write = true;
1341 interrupted_read = true;
1342 end_of_input = true;
1343 }
1344 // have mutex unlocked before notify_all to avoid pessimistic re-block of notified wait() thread.
1345 cvRead.notify_all(); // notify waiting writer
1346 cvWrite.notify_all(); // notify waiting reader
1347 }
1348
1349 /**
1350 * Clears all elements and add all `copyFrom` elements thereafter, as if reconstructing this ringbuffer instance.
1351 *
1352 * It is the user's obligation to ensure thread safety when using @ref ringbuffer_single_pc,
1353 * as implementation can only synchronize on the blocked put() and get() std::mutex.
1354 *
1355 * @param copyFrom Mandatory array w/ length {@link #capacity()} to be copied into the internal array.
1356 *
1357 * @see getMultiPCEnabled()
1358 * @see setMultiPCEnabled()
1359 * @see @ref ringbuffer_multi_pc
1360 * @see @ref ringbuffer_single_pc
1361 */
1362 void reset(const Value_type * copyFrom, const Size_type copyFromCount) noexcept {
1363 if( multi_pc_enabled ) {
1364 std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock); // same for *this instance!
1365 std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock);
1366 std::unique_lock<std::mutex> lockRead(syncRead, std::defer_lock); // same for *this instance!
1367 std::unique_lock<std::mutex> lockWrite(syncWrite, std::defer_lock);
1368 std::lock(lockMultiRead, lockMultiWrite, lockRead, lockWrite);
1369
1370 resetImpl(copyFrom, copyFromCount);
1371 } else {
1372 std::unique_lock<std::mutex> lockRead(syncRead, std::defer_lock); // same for *this instance!
1373 std::unique_lock<std::mutex> lockWrite(syncWrite, std::defer_lock);
1374 std::lock(lockRead, lockWrite);
1375
1376 resetImpl(copyFrom, copyFromCount);
1377 }
1378 }
1379
1380 /**
1381 * Clears all elements and add all `copyFrom` elements thereafter, as if reconstructing this ringbuffer instance.
1382 *
1383 * It is the user's obligation to ensure thread safety when using @ref ringbuffer_single_pc,
1384 * as implementation can only synchronize on the blocked put() and get() std::mutex.
1385 *
1386 * @param copyFrom
1387 *
1388 * @see getMultiPCEnabled()
1389 * @see setMultiPCEnabled()
1390 * @see @ref ringbuffer_multi_pc
1391 * @see @ref ringbuffer_single_pc
1392 */
1393 void reset(const std::vector<Value_type> & copyFrom) noexcept {
1394 if( multi_pc_enabled ) {
1395 std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock); // same for *this instance!
1396 std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock);
1397 std::unique_lock<std::mutex> lockRead(syncRead, std::defer_lock); // same for *this instance!
1398 std::unique_lock<std::mutex> lockWrite(syncWrite, std::defer_lock);
1399 std::lock(lockMultiRead, lockMultiWrite, lockRead, lockWrite);
1400
1401 resetImpl(copyFrom.data(), copyFrom.size());
1402 } else {
1403 std::unique_lock<std::mutex> lockRead(syncRead, std::defer_lock); // same for *this instance!
1404 std::unique_lock<std::mutex> lockWrite(syncWrite, std::defer_lock);
1405 std::lock(lockRead, lockWrite);
1406
1407 resetImpl(copyFrom.data(), copyFrom.size());
1408 }
1409 }
1410
1411 /**
1412 * Resizes this ring buffer's capacity.
1413 *
1414 * New capacity must be greater than current size.
1415 *
1416 * It is the user's obligation to ensure thread safety when using @ref ringbuffer_single_pc,
1417 * as implementation can only synchronize on the blocked put() and get() std::mutex.
1418 *
1419 * @param newCapacity
1420 *
1421 * @see getMultiPCEnabled()
1422 * @see setMultiPCEnabled()
1423 * @see @ref ringbuffer_multi_pc
1424 * @see @ref ringbuffer_single_pc
1425 */
1426 void recapacity(const Size_type newCapacity) {
1427 if( multi_pc_enabled ) {
1428 std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock); // same for *this instance!
1429 std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock);
1430 std::unique_lock<std::mutex> lockRead(syncRead, std::defer_lock); // same for *this instance!
1431 std::unique_lock<std::mutex> lockWrite(syncWrite, std::defer_lock);
1432 std::lock(lockMultiRead, lockMultiWrite, lockRead, lockWrite);
1433 recapacityImpl(newCapacity);
1434 } else {
1435 std::unique_lock<std::mutex> lockRead(syncRead, std::defer_lock); // same for *this instance!
1436 std::unique_lock<std::mutex> lockWrite(syncWrite, std::defer_lock);
1437 std::lock(lockRead, lockWrite);
1438
1439 recapacityImpl(newCapacity);
1440 }
1441 }
1442
1443 /**
1444 * Interrupt a potentially blocked reader once.
1445 *
1446 * Call this method to unblock a potentially blocked reader thread once.
1447 */
1448 void interruptReader() noexcept { interrupted_read = true; cvWrite.notify_all(); }
1449
1450 /**
1451 * Set `End of Input` from writer thread, unblocking all read-operations and a potentially currently blocked reader thread.
1452 *
1453 * Call this method with `true` after concluding writing input data will unblock all read-operations from this point onwards.
1454 * A potentially currently blocked reader thread is also interrupted and hence unblocked.
1455 */
1456 void set_end_of_input(const bool v=true) noexcept {
1457 end_of_input = v;
1458 if( v ) {
1459 cvWrite.notify_all();
1460 }
1461 }
1462
1463 /**
1464 * Interrupt a potentially blocked writer once.
1465 *
1466 * Call this method to unblock a potentially blocked writer thread once.
1467 */
1468 void interruptWriter() noexcept { interrupted_write = true; cvRead.notify_all(); }
1469
1470 /**
1471 * Peeks the next element at the read position w/o modifying pointer, nor blocking.
1472 *
1473 * Method is non blocking and returns immediately;.
1474 *
1475 * @param result storage for the resulting value if successful, otherwise unchanged.
1476 * @return true if successful, otherwise false.
1477 */
1478 [[nodiscard]] bool peek(Value_type& result) noexcept {
1479 bool timeout_occured_dummy;
1480 if( multi_pc_enabled ) {
1481 std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl
1482 return peekImpl(result, false, fractions_i64::zero, timeout_occured_dummy);
1483 } else {
1484 return peekImpl(result, false, fractions_i64::zero, timeout_occured_dummy);
1485 }
1486 }
1487
1488 /**
1489 * Peeks the next element at the read position w/o modifying pointer, but with blocking.
1490 *
1491 * @param result storage for the resulting value if successful, otherwise unchanged.
1492 * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely
1493 * @param timeout_occurred result value set to true if a timeout has occurred
1494 * @return true if successful, otherwise false.
1495 */
1496 [[nodiscard]] bool peekBlocking(Value_type& result, const fraction_i64& timeout, bool& timeout_occurred) noexcept {
1497 if( multi_pc_enabled ) {
1498 std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl
1499 return peekImpl(result, true, timeout, timeout_occurred);
1500 } else {
1501 return peekImpl(result, true, timeout, timeout_occurred);
1502 }
1503 }
1504
1505 /**
1506 * Peeks the next element at the read position w/o modifying pointer, but with blocking.
1507 *
1508 * @param result storage for the resulting value if successful, otherwise unchanged.
1509 * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely
1510 * @return true if successful, otherwise false.
1511 */
1512 [[nodiscard]] bool peekBlocking(Value_type& result, const fraction_i64& timeout) noexcept {
1513 bool timeout_occurred;
1514 return peekBlocking(result, timeout, timeout_occurred);
1515 }
1516
1517 /**
1518 * Dequeues the oldest enqueued element, if available.
1519 *
1520 * The ring buffer slot will be released and its value moved to the caller's `result` storage, if successful.
1521 *
1522 * Method is non blocking and returns immediately;.
1523 *
1524 * @param result storage for the resulting value if successful, otherwise unchanged.
1525 * @return true if successful, otherwise false.
1526 */
1527 [[nodiscard]] bool get(Value_type& result) noexcept {
1528 bool timeout_occured_dummy;
1529 if( multi_pc_enabled ) {
1530 std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl
1531 return moveOutImpl(result, false, fractions_i64::zero, timeout_occured_dummy);
1532 } else {
1533 return moveOutImpl(result, false, fractions_i64::zero, timeout_occured_dummy);
1534 }
1535 }
1536
1537 /**
1538 * Dequeues the oldest enqueued element.
1539 *
1540 * The ring buffer slot will be released and its value moved to the caller's `result` storage, if successful.
1541 *
1542 * @param result storage for the resulting value if successful, otherwise unchanged.
1543 * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely
1544 * @param timeout_occurred result value set to true if a timeout has occurred
1545 * @return true if successful, otherwise false.
1546 */
1547 [[nodiscard]] bool getBlocking(Value_type& result, const fraction_i64& timeout, bool& timeout_occurred) noexcept {
1548 if( multi_pc_enabled ) {
1549 std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl
1550 return moveOutImpl(result, true, timeout, timeout_occurred);
1551 } else {
1552 return moveOutImpl(result, true, timeout, timeout_occurred);
1553 }
1554 }
1555
1556 /**
1557 * Dequeues the oldest enqueued element.
1558 *
1559 * The ring buffer slot will be released and its value moved to the caller's `result` storage, if successful.
1560 *
1561 * @param result storage for the resulting value if successful, otherwise unchanged.
1562 * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely
1563 * @return true if successful, otherwise false.
1564 */
1565 [[nodiscard]] bool getBlocking(Value_type& result, const fraction_i64& timeout) noexcept {
1566 bool timeout_occurred;
1567 return getBlocking(result, timeout, timeout_occurred);
1568 }
1569
1570 /**
1571 * Dequeues the oldest enqueued `min(dest_len, size()>=min_count)` elements by copying them into the given consecutive 'dest' storage.
1572 *
1573 * The ring buffer slots will be released and its value moved to the caller's `dest` storage, if successful.
1574 *
1575 * Method is non blocking and returns immediately;.
1576 *
1577 * @param dest pointer to first storage element of `dest_len` consecutive elements to store the values, if successful.
1578 * @param dest_len number of consecutive elements in `dest`, hence maximum number of elements to return.
1579 * @param min_count minimum number of consecutive elements to return.
1580 * @return actual number of elements returned
1581 */
1582 [[nodiscard]] Size_type get(Value_type *dest, const Size_type dest_len, const Size_type min_count) noexcept {
1583 bool timeout_occured_dummy;
1584 if( multi_pc_enabled ) {
1585 std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl
1586 return moveOutImpl(dest, dest_len, min_count, false, fractions_i64::zero, timeout_occured_dummy);
1587 } else {
1588 return moveOutImpl(dest, dest_len, min_count, false, fractions_i64::zero, timeout_occured_dummy);
1589 }
1590 }
1591
1592 /**
1593 * Dequeues the oldest enqueued `min(dest_len, size()>=min_count)` elements by copying them into the given consecutive 'dest' storage.
1594 *
1595 * The ring buffer slots will be released and its value moved to the caller's `dest` storage, if successful.
1596 *
1597 * @param dest pointer to first storage element of `dest_len` consecutive elements to store the values, if successful.
1598 * @param dest_len number of consecutive elements in `dest`, hence maximum number of elements to return.
1599 * @param min_count minimum number of consecutive elements to return
1600 * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely
1601 * @param timeout_occurred result value set to true if a timeout has occurred
1602 * @return actual number of elements returned
1603 */
1604 [[nodiscard]] Size_type getBlocking(Value_type *dest, const Size_type dest_len, const Size_type min_count, const fraction_i64& timeout, bool& timeout_occurred) noexcept {
1605 if( multi_pc_enabled ) {
1606 std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl
1607 return moveOutImpl(dest, dest_len, min_count, true, timeout, timeout_occurred);
1608 } else {
1609 return moveOutImpl(dest, dest_len, min_count, true, timeout, timeout_occurred);
1610 }
1611 }
1612
1613 /**
1614 * Dequeues the oldest enqueued `min(dest_len, size()>=min_count)` elements by copying them into the given consecutive 'dest' storage.
1615 *
1616 * The ring buffer slots will be released and its value moved to the caller's `dest` storage, if successful.
1617 *
1618 * @param dest pointer to first storage element of `dest_len` consecutive elements to store the values, if successful.
1619 * @param dest_len number of consecutive elements in `dest`, hence maximum number of elements to return.
1620 * @param min_count minimum number of consecutive elements to return
1621 * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely
1622 * @return actual number of elements returned
1623 */
1624 [[nodiscard]] Size_type getBlocking(Value_type *dest, const Size_type dest_len, const Size_type min_count, const fraction_i64& timeout) noexcept {
1625 bool timeout_occurred;
1626 return getBlocking(dest, dest_len, min_count, timeout, timeout_occurred);
1627 }
1628
1629 /**
1630 * Drops up to `max_count` oldest enqueued elements,
1631 * but may drop less if not available.
1632 *
1633 * Method is non blocking and returns immediately;.
1634 *
1635 * @param max_count maximum number of elements to drop from ringbuffer.
1636 * @return number of elements dropped
1637 */
1638 Size_type drop(const Size_type max_count) noexcept {
1639 bool timeout_occured_dummy;
1640 if( multi_pc_enabled ) {
1641 std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops
1642 std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor
1643 std::lock(lockMultiRead, lockMultiWrite);
1644 return dropImpl(max_count, false, fractions_i64::zero, timeout_occured_dummy);
1645 } else {
1646 return dropImpl(max_count, false, fractions_i64::zero, timeout_occured_dummy);
1647 }
1648 }
1649
1650 /**
1651 * Drops exactly `count` oldest enqueued elements,
1652 * will block until they become available.
1653 *
1654 * In `count` elements are not available to drop even after
1655 * blocking for `timeoutMS`, no element will be dropped.
1656 *
1657 * @param count number of elements to drop from ringbuffer.
1658 * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely
1659 * @param timeout_occurred result value set to true if a timeout has occurred
1660 * @return true if successful, otherwise false
1661 */
1662 bool dropBlocking(const Size_type count, const fraction_i64& timeout, bool& timeout_occurred) noexcept {
1663 if( multi_pc_enabled ) {
1664 std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops
1665 std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor
1666 std::lock(lockMultiRead, lockMultiWrite);
1667 return 0 != dropImpl(count, true, timeout, timeout_occurred);
1668 } else {
1669 return 0 != dropImpl(count, true, timeout, timeout_occurred);
1670 }
1671 }
1672
1673 /**
1674 * Drops exactly `count` oldest enqueued elements,
1675 * will block until they become available.
1676 *
1677 * In `count` elements are not available to drop even after
1678 * blocking for `timeoutMS`, no element will be dropped.
1679 *
1680 * @param count number of elements to drop from ringbuffer.
1681 * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely
1682 * @return true if successful, otherwise false
1683 */
1684 bool dropBlocking(const Size_type count, const fraction_i64& timeout) noexcept {
1685 bool timeout_occurred;
1686 return dropBlocking(count, timeout, timeout_occurred);
1687 }
1688
1689 /**
1690 * Enqueues the given element by moving it into this ringbuffer storage.
1691 *
1692 * Returns true if successful, otherwise false in case buffer is full.
1693 *
1694 * Method is non blocking and returns immediately;.
1695 *
1696 * @param e value to be moved into this ringbuffer
1697 * @return true if successful, otherwise false
1698 */
1699 [[nodiscard]] bool put(Value_type && e) noexcept {
1700 bool timeout_occured_dummy;
1701 if( multi_pc_enabled ) {
1702 std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiWrite, _not_ sync'ing w/ getImpl
1703 return moveIntoImpl(std::move(e), false, fractions_i64::zero, timeout_occured_dummy);
1704 } else {
1705 return moveIntoImpl(std::move(e), false, fractions_i64::zero, timeout_occured_dummy);
1706 }
1707 }
1708
1709 /**
1710 * Enqueues the given element by moving it into this ringbuffer storage.
1711 *
1712 * @param e value to be moved into this ringbuffer
1713 * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely
1714 * @param timeout_occurred result value set to true if a timeout has occurred
1715 * @return true if successful, otherwise false in case timeout occurred or otherwise.
1716 */
1717 [[nodiscard]] bool putBlocking(Value_type && e, const fraction_i64& timeout, bool& timeout_occurred) noexcept {
1718 if( multi_pc_enabled ) {
1719 std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiWrite, _not_ sync'ing w/ getImpl
1720 return moveIntoImpl(std::move(e), true, timeout, timeout_occurred);
1721 } else {
1722 return moveIntoImpl(std::move(e), true, timeout, timeout_occurred);
1723 }
1724 }
1725
1726 /**
1727 * Enqueues the given element by moving it into this ringbuffer storage.
1728 *
1729 * @param e value to be moved into this ringbuffer
1730 * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely
1731 * @return true if successful, otherwise false in case timeout occurred or otherwise.
1732 */
1733 [[nodiscard]] bool putBlocking(Value_type && e, const fraction_i64& timeout) noexcept {
1734 bool timeout_occurred;
1735 return putBlocking(std::move(e), timeout, timeout_occurred);
1736 }
1737
1738 /**
1739 * Enqueues the given element by copying it into this ringbuffer storage.
1740 *
1741 * Returns true if successful, otherwise false in case buffer is full.
1742 *
1743 * Method is non blocking and returns immediately;.
1744 *
1745 * @return true if successful, otherwise false
1746 */
1747 [[nodiscard]] bool put(const Value_type & e) noexcept {
1748 bool timeout_occured_dummy;
1749 if( multi_pc_enabled ) {
1750 std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiWrite, _not_ sync'ing w/ getImpl
1751 return copyIntoImpl(e, false, fractions_i64::zero, timeout_occured_dummy);
1752 } else {
1753 return copyIntoImpl(e, false, fractions_i64::zero, timeout_occured_dummy);
1754 }
1755 }
1756
1757 /**
1758 * Enqueues the given element by copying it into this ringbuffer storage.
1759 *
1760 * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely
1761 * @param timeout_occurred result value set to true if a timeout has occurred
1762 * @return true if successful, otherwise false in case timeout occurred or otherwise.
1763 */
1764 [[nodiscard]] bool putBlocking(const Value_type & e, const fraction_i64& timeout, bool& timeout_occurred) noexcept {
1765 if( multi_pc_enabled ) {
1766 std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiWrite, _not_ sync'ing w/ getImpl
1767 return copyIntoImpl(e, true, timeout, timeout_occurred);
1768 } else {
1769 return copyIntoImpl(e, true, timeout, timeout_occurred);
1770 }
1771 }
1772
1773 /**
1774 * Enqueues the given element by copying it into this ringbuffer storage.
1775 *
1776 * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely
1777 * @return true if successful, otherwise false in case timeout occurred or otherwise.
1778 */
1779 [[nodiscard]] bool putBlocking(const Value_type & e, const fraction_i64& timeout) noexcept {
1780 bool timeout_occurred;
1781 return putBlocking(e, timeout, timeout_occurred);
1782 }
1783
1784 /**
1785 * Enqueues the given range of consecutive elements by copying it into this ringbuffer storage.
1786 *
1787 * Returns true if successful, otherwise false in case buffer is full.
1788 *
1789 * Method is non blocking and returns immediately;.
1790 *
1791 * @param first pointer to first consecutive element to range of value_type [first, last)
1792 * @param last pointer to last consecutive element to range of value_type [first, last)
1793 * @return true if successful, otherwise false
1794 */
1795 [[nodiscard]] bool put(const Value_type *first, const Value_type* last) noexcept {
1796 bool timeout_occured_dummy;
1797 if( multi_pc_enabled ) {
1798 std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiWrite, _not_ sync'ing w/ getImpl
1799 return copyIntoImpl(first, last, false, fractions_i64::zero, timeout_occured_dummy);
1800 } else {
1801 return copyIntoImpl(first, last, false, fractions_i64::zero, timeout_occured_dummy);
1802 }
1803 }
1804
1805 /**
1806 * Enqueues the given range of consecutive elementa by copying it into this ringbuffer storage.
1807 *
1808 * @param first pointer to first consecutive element to range of value_type [first, last)
1809 * @param last pointer to last consecutive element to range of value_type [first, last)
1810 * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely
1811 * @param timeout_occurred result value set to true if a timeout has occurred
1812 * @return true if successful, otherwise false in case timeout occurred or otherwise.
1813 */
1814 [[nodiscard]] bool putBlocking(const Value_type *first, const Value_type* last, const fraction_i64& timeout, bool& timeout_occurred) noexcept {
1815 if( multi_pc_enabled ) {
1816 std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiWrite, _not_ sync'ing w/ getImpl
1817 return copyIntoImpl(first, last, true, timeout, timeout_occurred);
1818 } else {
1819 return copyIntoImpl(first, last, true, timeout, timeout_occurred);
1820 }
1821 }
1822
1823 /**
1824 * Enqueues the given range of consecutive elementa by copying it into this ringbuffer storage.
1825 *
1826 * @param first pointer to first consecutive element to range of value_type [first, last)
1827 * @param last pointer to last consecutive element to range of value_type [first, last)
1828 * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely
1829 * @return true if successful, otherwise false in case timeout occurred or otherwise.
1830 */
1831 [[nodiscard]] bool putBlocking(const Value_type *first, const Value_type* last, const fraction_i64& timeout) noexcept {
1832 bool timeout_occurred;
1833 return putBlocking(first, last, timeout, timeout_occurred);
1834 }
1835};
1836
1837/**@}*/
1838
1839} /* namespace jau */
1840
1841/** \example test_lfringbuffer01.cpp
1842 * This C++ unit test validates jau::ringbuffer w/o parallel processing.
1843 * <p>
1844 * With test_lfringbuffer11.cpp, this work verifies jau::ringbuffer correctness
1845 * </p>
1846 */
1847
1848/** \example test_lfringbuffer11.cpp
1849 * This C++ unit test validates jau::ringbuffer with parallel processing.
1850 * <p>
1851 * With test_lfringbuffer01.cpp, this work verifies jau::ringbuffer correctness
1852 * </p>
1853 */
1854
1855#endif /* JAU_RINGBUFFER_HPP_ */
#define E_FILE_LINE
Ring buffer implementation, a.k.a circular buffer, exposing lock-free get*(..) and put*(....
Definition: ringbuffer.hpp:182
ringbuffer(const Value_type *copyFrom, const Size_type copyFromSize) noexcept
bool putBlocking(Value_type &&e, const fraction_i64 &timeout, bool &timeout_occurred) noexcept
Enqueues the given element by moving it into this ringbuffer storage.
bool peek(Value_type &result) noexcept
Peeks the next element at the read position w/o modifying pointer, nor blocking.
void interruptReader() noexcept
Interrupt a potentially blocked reader once.
bool peekBlocking(Value_type &result, const fraction_i64 &timeout, bool &timeout_occurred) noexcept
Peeks the next element at the read position w/o modifying pointer, but with blocking.
bool put(Value_type &&e) noexcept
Enqueues the given element by moving it into this ringbuffer storage.
ringbuffer(ringbuffer &&o) noexcept=default
static constexpr const bool uses_memmove
Definition: ringbuffer.hpp:184
bool putBlocking(const Value_type *first, const Value_type *last, const fraction_i64 &timeout, bool &timeout_occurred) noexcept
Enqueues the given range of consecutive elementa by copying it into this ringbuffer storage.
bool put(const Value_type *first, const Value_type *last) noexcept
Enqueues the given range of consecutive elements by copying it into this ringbuffer storage.
bool getBlocking(Value_type &result, const fraction_i64 &timeout) noexcept
Dequeues the oldest enqueued element.
ringbuffer & operator=(ringbuffer &&o) noexcept=default
ringbuffer & operator=(const ringbuffer &_source) noexcept
bool get(Value_type &result) noexcept
Dequeues the oldest enqueued element, if available.
Size_type waitForElements(const Size_type min_count, const fraction_i64 &timeout, bool &timeout_occured) noexcept
Blocks until at least count elements have been put for subsequent get() and getBlocking().
std::make_signed< size_type >::type difference_type
Definition: ringbuffer.hpp:196
~ringbuffer() noexcept
constexpr bool getMultiPCEnabled() const
Return whether multiple producer and consumer are enabled, see ringbuffer_multi_pc and ringbuffer_sin...
value_type & reference
Definition: ringbuffer.hpp:193
bool putBlocking(const Value_type *first, const Value_type *last, const fraction_i64 &timeout) noexcept
Enqueues the given range of consecutive elementa by copying it into this ringbuffer storage.
Size_type size() const noexcept
Returns the number of elements in this ring buffer.
const value_type * const_pointer
Definition: ringbuffer.hpp:192
bool getBlocking(Value_type &result, const fraction_i64 &timeout, bool &timeout_occurred) noexcept
Dequeues the oldest enqueued element.
bool peekBlocking(Value_type &result, const fraction_i64 &timeout) noexcept
Peeks the next element at the read position w/o modifying pointer, but with blocking.
void dump(FILE *stream, const std::string &prefix) const noexcept
Debug functionality - Dumps the contents of the internal array.
bool dropBlocking(const Size_type count, const fraction_i64 &timeout) noexcept
Drops exactly count oldest enqueued elements, will block until they become available.
void set_end_of_input(const bool v=true) noexcept
Set End of Input from writer thread, unblocking all read-operations and a potentially currently block...
Size_type waitForFreeSlots(const Size_type min_count, const fraction_i64 &timeout, bool &timeout_occured) noexcept
Blocks until at least count free slots become available for subsequent put() and putBlocking().
void close(const bool zeromem=false) noexcept
Close this ringbuffer by releasing all elements available and resizing capacity to zero.
Size_type size_type
Definition: ringbuffer.hpp:195
Size_type get(Value_type *dest, const Size_type dest_len, const Size_type min_count) noexcept
Dequeues the oldest enqueued min(dest_len, size()>=min_count) elements by copying them into the given...
Size_type freeSlots() const noexcept
Returns the number of free slots available to put.
void clear(const bool zeromem=false) noexcept
Releasing all elements available, i.e.
void reset(const Value_type *copyFrom, const Size_type copyFromCount) noexcept
Clears all elements and add all copyFrom elements thereafter, as if reconstructing this ringbuffer in...
bool putBlocking(const Value_type &e, const fraction_i64 &timeout) noexcept
Enqueues the given element by copying it into this ringbuffer storage.
constexpr_non_literal_var void setMultiPCEnabled(const bool v)
Enable or disable capability to handle multiple producer and consumer, see ringbuffer_multi_pc and ri...
jau::callocator< Value_type > allocator_type
Definition: ringbuffer.hpp:198
bool isFull() const noexcept
Returns true if this ring buffer is full, otherwise false.
bool put(const Value_type &e) noexcept
Enqueues the given element by copying it into this ringbuffer storage.
ringbuffer(const std::vector< Value_type > &copyFrom) noexcept
Create a full ring buffer instance w/ the given array's net capacity and content.
bool isEmpty() const noexcept
Returns true if this ring buffer is empty, otherwise false.
std::string toString() const noexcept
Returns a short string representation incl.
Size_type capacity() const noexcept
Returns the net capacity of this ring buffer.
void recapacity(const Size_type newCapacity)
Resizes this ring buffer's capacity.
static constexpr const bool uses_secmem
Definition: ringbuffer.hpp:186
value_type * pointer
Definition: ringbuffer.hpp:191
ringbuffer(const ringbuffer &_source) noexcept
std::string get_info() const noexcept
static constexpr const bool uses_memcpy
Definition: ringbuffer.hpp:185
const value_type & const_reference
Definition: ringbuffer.hpp:194
void interruptWriter() noexcept
Interrupt a potentially blocked writer once.
ringbuffer(const Size_type capacity) noexcept
Create an empty ring buffer instance w/ the given net capacity.
Size_type drop(const Size_type max_count) noexcept
Drops up to max_count oldest enqueued elements, but may drop less if not available.
Value_type value_type
Definition: ringbuffer.hpp:190
Size_type getBlocking(Value_type *dest, const Size_type dest_len, const Size_type min_count, const fraction_i64 &timeout, bool &timeout_occurred) noexcept
Dequeues the oldest enqueued min(dest_len, size()>=min_count) elements by copying them into the given...
Size_type getBlocking(Value_type *dest, const Size_type dest_len, const Size_type min_count, const fraction_i64 &timeout) noexcept
Dequeues the oldest enqueued min(dest_len, size()>=min_count) elements by copying them into the given...
void reset(const std::vector< Value_type > &copyFrom) noexcept
Clears all elements and add all copyFrom elements thereafter, as if reconstructing this ringbuffer in...
bool putBlocking(const Value_type &e, const fraction_i64 &timeout, bool &timeout_occurred) noexcept
Enqueues the given element by copying it into this ringbuffer storage.
bool dropBlocking(const Size_type count, const fraction_i64 &timeout, bool &timeout_occurred) noexcept
Drops exactly count oldest enqueued elements, will block until they become available.
bool putBlocking(Value_type &&e, const fraction_i64 &timeout) noexcept
Enqueues the given element by moving it into this ringbuffer storage.
#define ABORT(...)
Use for unconditional ::abort() call with given messages, prefix '[elapsed_time] ABORT @ file:line fu...
Definition: debug.hpp:101
std::string to_string(const alphabet &v) noexcept
Definition: base_codec.hpp:97
#define constexpr_non_literal_var
Used when designed to declare a function constexpr, but prohibited by its specific implementation.
fraction_timespec getMonotonicTime() noexcept
Returns current monotonic time since Unix Epoch 00:00:00 UTC on 1970-01-01.
Definition: basic_types.cpp:52
fraction< int64_t > fraction_i64
fraction using int64_t as integral type
constexpr T min(const T x, const T y) noexcept
Returns the minimum of two integrals (w/ branching) in O(1)
Definition: base_math.hpp:177
std::string to_hexstring(value_type const &v) noexcept
Produce a lower-case hexadecimal string representation of the given pointer.
constexpr const jau::fraction_i64 zero(0l, 1lu)
zero is 0/1
__pack(...): Produces MSVC, clang and gcc compatible lead-in and -out macros.
Definition: backtrace.hpp:32
std::cv_status wait_until(std::condition_variable &cv, std::unique_lock< std::mutex > &lock, const fraction_timespec &absolute_time, const bool monotonic=true) noexcept
wait_until causes the current thread to block until the condition variable is notified,...
#define _DEBUG_DUMP(...)
Definition: ringbuffer.hpp:54
#define _DEBUG_DUMP2(a,...)
Definition: ringbuffer.hpp:55
#define _DEBUG_PRINT(...)
Definition: ringbuffer.hpp:56
uint8_t Value_type