Direct-BT v3.3.0-1-gc2d430c
Direct-BT - Direct Bluetooth Programming.
byte_stream.hpp
Go to the documentation of this file.
1/*
2 * Author: Sven Gothel <sgothel@jausoft.com>
3 * Copyright (c) 2021-2023 Gothel Software e.K.
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining
6 * a copy of this software and associated documentation files (the
7 * "Software"), to deal in the Software without restriction, including
8 * without limitation the rights to use, copy, modify, merge, publish,
9 * distribute, sublicense, and/or sell copies of the Software, and to
10 * permit persons to whom the Software is furnished to do so, subject to
11 * the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be
14 * included in all copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
17 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
18 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
19 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
20 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
21 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
22 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23 */
24
25#ifndef JAU_BYTE_STREAM_HPP_
26#define JAU_BYTE_STREAM_HPP_
27
28#include <string>
29#include <cstdint>
30#include <functional>
31#include <thread>
32
33#include <jau/basic_types.hpp>
34#include <jau/ringbuffer.hpp>
35#include <jau/file_util.hpp>
36
37#include <jau/io_util.hpp>
38
39using namespace jau::fractions_i64_literals;
40
41namespace jau::io {
42
43 /** \addtogroup IOUtils
44 *
45 * @{
46 */
47
48 /**
49 * Mimic std::ios_base::iostate for state functionality, see iostate_func.
50 *
51 * This `enum class` type fulfills `C++ named requirements: BitmaskType`.
52 */
53 enum class iostate : uint32_t {
54 /** No error occurred nor has EOS being reached. Value is no bit set! */
55 none = 0,
56
57 /** No error occurred nor has EOS being reached. Value is no bit set! */
58 goodbit = 0,
59
60 /** Irrecoverable stream error, including loss of integrity of the underlying stream or media. */
61 badbit = 1 << 0,
62
63 /** An input operation reached the end of its stream. */
64 eofbit = 1 << 1,
65
66 /** Input or output operation failed (formatting or extraction error). */
67 failbit = 1 << 2,
68
69 /** Input or output operation failed due to timeout. */
70 timeout = 1 << 3
71 };
72 constexpr uint32_t number(const iostate rhs) noexcept {
73 return static_cast<uint32_t>(rhs);
74 }
75 constexpr iostate operator ~(const iostate rhs) noexcept {
76 return static_cast<iostate> ( ~number(rhs) );
77 }
78 constexpr iostate operator ^(const iostate lhs, const iostate rhs) noexcept {
79 return static_cast<iostate> ( number(lhs) ^ number(rhs) );
80 }
81 constexpr iostate operator |(const iostate lhs, const iostate rhs) noexcept {
82 return static_cast<iostate> ( number(lhs) | number(rhs) );
83 }
84 constexpr iostate operator &(const iostate lhs, const iostate rhs) noexcept {
85 return static_cast<iostate> ( number(lhs) & number(rhs) );
86 }
87 constexpr iostate& operator |=(iostate& lhs, const iostate rhs) noexcept {
88 lhs = static_cast<iostate> ( number(lhs) | number(rhs) );
89 return lhs;
90 }
91 constexpr iostate& operator &=(iostate& lhs, const iostate rhs) noexcept {
92 lhs = static_cast<iostate> ( number(lhs) & number(rhs) );
93 return lhs;
94 }
95 constexpr iostate& operator ^=(iostate& lhs, const iostate rhs) noexcept {
96 lhs = static_cast<iostate> ( number(lhs) ^ number(rhs) );
97 return lhs;
98 }
99 constexpr bool operator ==(const iostate lhs, const iostate rhs) noexcept {
100 return number(lhs) == number(rhs);
101 }
102 constexpr bool operator !=(const iostate lhs, const iostate rhs) noexcept {
103 return !( lhs == rhs );
104 }
105 std::string to_string(const iostate mask) noexcept;
106
107 /**
108 * Supporting std::basic_ios's iostate functionality for all ByteInStream implementations.
109 */
111 private:
112 iostate m_state;
113
114 protected:
115 constexpr iostate rdstate_impl() const noexcept { return m_state; }
116 constexpr void setstate_impl(iostate state) const noexcept { const_cast<iostate_func*>(this)->m_state |= state; }
117
118 public:
119 iostate_func() noexcept
120 : m_state( iostate::goodbit ) {}
121
122 iostate_func(const iostate_func& o) noexcept = default;
123 iostate_func(iostate_func&& o) noexcept = default;
124 iostate_func& operator=(const iostate_func &o) noexcept = default;
125 iostate_func& operator=(iostate_func &&o) noexcept = default;
126
127 virtual ~iostate_func() noexcept = default;
128
129 /** Clears state flags by assignment to the given value. */
130 virtual void clear(const iostate state = iostate::goodbit) noexcept { m_state = state; }
131
132 /**
133 * Returns the current state flags.
134 *
135 * Method is marked `virtual` to allow implementations with asynchronous resources
136 * to determine or update the current iostate.
137 *
138 * Method is used throughout all query members and setstate(),
139 * hence they all will use the updated state from a potential override implementation.
140 */
141 virtual iostate rdstate() const noexcept { return m_state; }
142
143 /** Sets state flags, by keeping its previous bits. */
144 void setstate(const iostate state) noexcept { clear( rdstate() | state ); }
145
146 /** Checks if no error nor eof() has occurred i.e. I/O operations are available. */
147 bool good() const noexcept
148 { return iostate::goodbit == rdstate(); }
149
150 /** Checks if end-of-file has been reached. */
151 bool eof() const noexcept
152 { return iostate::none != ( rdstate() & iostate::eofbit ); }
153
154 /** Checks if an error has occurred. */
155 bool fail() const noexcept
157
158 /** Checks if an error has occurred, synonym of fail(). */
159 bool operator!() const noexcept { return fail(); }
160
161 /** Checks if no error has occurred, synonym of !fail(). */
162 explicit operator bool() const noexcept { return !fail(); }
163
164 /** Checks if a non-recoverable error has occurred. */
165 bool bad() const noexcept
166 { return iostate::none != ( rdstate() & iostate::badbit ); }
167
168 /** Checks if a timeout (non-recoverable) has occurred. */
169 bool timeout() const noexcept
170 { return iostate::none != ( rdstate() & iostate::timeout ); }
171 };
172
173 /**
174 * Abstract byte input stream object.
175 *
176 * @anchor byte_in_stream_properties
177 * ### ByteInStream Properties
178 * The byte input stream can originate from a local source w/o delay,
179 * remote URL like http connection or even from another thread feeding the input buffer.<br />
180 * Both latter asynchronous resources may expose blocking properties
181 * in available().
182 *
183 * Asynchronous resources benefit from knowing their content size,
184 * as their available() implementation may avoid
185 * blocking and waiting for requested bytes available
186 * if the stream is already beyond its scope.
187 *
188 * All method implementations are of `noexcept`.
189 *
190 * One may use fail() to detect whether an error has occurred,
191 * while end_of_data() not only covers the end-of-stream (EOS) case but includes fail().
192 *
193 * @see @ref byte_in_stream_properties "ByteInStream Properties"
194 */
196 {
197 public:
198 ByteInStream() noexcept
199 : iostate_func() {}
200
201 ~ByteInStream() noexcept override = default;
202 ByteInStream& operator=(const ByteInStream&) = delete;
203 ByteInStream(const ByteInStream&) = delete;
204
205 /** Checks if the stream has an associated file. */
206 virtual bool is_open() const noexcept = 0;
207
208 /**
209 * Close the stream if supported by the underlying mechanism.
210 */
211 virtual void close() noexcept = 0;
212
213 /**
214 * Return whether n bytes are available in the input stream,
215 * if has_content_size() or using an asynchronous source.
216 *
217 * If !has_content_size() and not being an asynchronous source,
218 * !end_of_data() is returned.
219 *
220 * Method may be blocking when using an asynchronous source
221 * up until the requested bytes are available.
222 *
223 * A subsequent call to read() shall return immediately with at least
224 * the requested numbers of bytes available,
225 * if has_content_size() or using an asynchronous source.
226 *
227 * See details of the implementing class.
228 *
229 * @param n byte count to wait for
230 * @return true if n bytes are available, otherwise false
231 *
232 * @see has_content_size()
233 * @see read()
234 * @see @ref byte_in_stream_properties "ByteInStream Properties"
235 */
236 virtual bool available(size_t n) noexcept = 0;
237
238 /**
239 * Read from the source. Moves the internal offset so that every
240 * call to read will return a new portion of the source.
241 *
242 * Use available() to try to wait for a certain amount of bytes available.
243 *
244 * This method shall only block until `min(available, length)` bytes are transfered.
245 *
246 * See details of the implementing class.
247 *
248 * @param out the byte array to write the result to
249 * @param length the length of the byte array out
250 * @return length in bytes that was actually read and put into out
251 *
252 * @see available()
253 * @see @ref byte_in_stream_properties "ByteInStream Properties"
254 */
255 [[nodiscard]] virtual size_t read(void* out, size_t length) noexcept = 0;
256
257 /**
258 * Read one byte.
259 * @param out the byte to read to
260 * @return true if one byte has been read, false otherwise
261 */
262 [[nodiscard]] bool read(uint8_t& out) noexcept;
263
264 /**
265 * Read from the source but do not modify the internal
266 * offset. Consecutive calls to peek() will return portions of
267 * the source starting at the same position.
268 *
269 * @param out the byte array to write the output to
270 * @param length the length of the byte array out
271 * @param peek_offset the offset into the stream to read at
272 * @return length in bytes that was actually read and put into out
273 */
274 [[nodiscard]] virtual size_t peek(void* out, size_t length, size_t peek_offset) noexcept = 0;
275
276 /**
277 * Peek at one byte.
278 * @param out an output byte
279 * @return true if one byte has been peeked, false otherwise
280 */
281 [[nodiscard]] bool peek(uint8_t& out) noexcept;
282
283 /**
284 * Discard the next N bytes of the data
285 * @param N the number of bytes to discard
286 * @return number of bytes actually discarded
287 */
288 [[nodiscard]] size_t discard(size_t N) noexcept;
289
290 /**
291 * return the id of this data source
292 * @return std::string representing the id of this data source
293 */
294 virtual std::string id() const noexcept { return ""; }
295
296 /**
297 * Returns the input position indicator, similar to std::basic_istream.
298 *
299 * @return number of bytes read so far.
300 */
301 virtual uint64_t tellg() const noexcept = 0;
302
303 /**
304 * Returns true if implementation is aware of content_size(), otherwise false.
305 * @see content_size()
306 */
307 virtual bool has_content_size() const noexcept = 0;
308
309 /**
310 * Returns the content_size if known.
311 * @see has_content_size()
312 */
313 virtual uint64_t content_size() const noexcept = 0;
314
315 virtual std::string to_string() const noexcept = 0;
316 };
317
318 /**
319 * Secure Memory-Based byte input stream
320 */
322 public:
323 [[nodiscard]] size_t read(void*, size_t) noexcept override;
324
325 [[nodiscard]] size_t peek(void*, size_t, size_t) noexcept override;
326
327 bool available(size_t n) noexcept override;
328
329 /**
330 * Construct a secure memory source that reads from a string
331 * @param in the string to read from
332 */
333 explicit ByteInStream_SecMemory(const std::string& in);
334
335 /**
336 * Construct a secure memory source that reads from a byte array
337 * @param in the byte array to read from
338 * @param length the length of the byte array
339 */
340 ByteInStream_SecMemory(const uint8_t in[], size_t length)
341 : m_source(in, in + length), m_offset(0) {}
342
343 /**
344 * Construct a secure memory source that reads from a secure_vector
345 * @param in the MemoryRegion to read from
346 */
348 : m_source(std::move(in)), m_offset(0) {}
349
350 /**
351 * Construct a secure memory source that reads from a std::vector
352 * @param in the MemoryRegion to read from
353 */
354 explicit ByteInStream_SecMemory(const std::vector<uint8_t>& in)
355 : m_source(in.begin(), in.end()), m_offset(0) {}
356
357 bool is_open() const noexcept override { return true; }
358
359 void close() noexcept override;
360
361 ~ByteInStream_SecMemory() noexcept override { close(); }
362
363 uint64_t tellg() const noexcept override { return m_offset; }
364
365 bool has_content_size() const noexcept override { return true; }
366
367 uint64_t content_size() const noexcept override { return m_source.size(); }
368
369 std::string to_string() const noexcept override;
370
371 private:
372 io::secure_vector<uint8_t> m_source;
373 size_t m_offset;
374 };
375
376 /**
377 * File based byte input stream, including named file descriptor.
378 *
379 * Implementation mimics std::ifstream via OS level file descriptor (FD) operations,
380 * giving more flexibility, allowing reusing existing FD and enabling openat() operations.
381 *
382 * If source path denotes a named file descriptor, i.e. jau::fs::file_stats::is_fd() returns true,
383 * has_content_size() returns false and available() returns true as long the stream is open and EOS hasn't occurred.
384 */
385 class ByteInStream_File final : public ByteInStream {
386 private:
388
389 int m_fd;
390
391 bool m_has_content_length;
392 uint64_t m_content_size;
393 uint64_t m_bytes_consumed;
394 uint64_t get_available() const noexcept { return m_has_content_length ? m_content_size - m_bytes_consumed : 0; }
395
396 public:
397 [[nodiscard]] size_t read(void*, size_t) noexcept override;
398 [[nodiscard]] size_t peek(void*, size_t, size_t) noexcept override;
399 bool available(size_t n) noexcept override;
400
401 bool is_open() const noexcept override { return 0 <= m_fd; }
402
403 std::string id() const noexcept override { return stats.path(); }
404
405 /**
406 * Returns the file descriptor if is_open(), otherwise -1 for no file descriptor.
407 *
408 * @see is_open()
409 */
410 int fd() const noexcept { return m_fd; }
411
412 /**
413 * Construct a stream based byte input stream from filesystem path
414 *
415 * In case the given path is a local file URI starting with `file://`, see jau::io::uri::is_local_file_protocol(),
416 * the leading `file://` is cut off and the remainder being used.
417 *
418 * @param path the path to the file, maybe a local file URI
419 */
420 ByteInStream_File(const std::string& path) noexcept;
421
422 /**
423 * Construct a stream based byte input stream from filesystem path and parent directory file descriptor
424 *
425 * In case the given path is a local file URI starting with `file://`, see jau::io::uri::is_local_file_protocol(),
426 * the leading `file://` is cut off and the remainder being used.
427 *
428 * @param dirfd parent directory file descriptor
429 * @param path the path to the file, maybe a local file URI
430 */
431 ByteInStream_File(const int dirfd, const std::string& path) noexcept;
432
433 /**
434 * Construct a stream based byte input stream by duplicating given file descriptor
435 *
436 * In case the given path is a local file URI starting with `file://`, see jau::io::uri::is_local_file_protocol(),
437 * the leading `file://` is cut off and the remainder being used.
438 *
439 * @param fd file descriptor to duplicate leaving the given `fd` untouched
440 */
441 ByteInStream_File(const int fd) noexcept;
442
444
446
447 void close() noexcept override;
448
449 ~ByteInStream_File() noexcept override { close(); }
450
451 uint64_t tellg() const noexcept override { return m_bytes_consumed; }
452
453 bool has_content_size() const noexcept override { return m_has_content_length; }
454
455 uint64_t content_size() const noexcept override { return m_content_size; }
456
457 std::string to_string() const noexcept override;
458 };
459
460 /**
461 * Ringbuffer-Based byte input stream with a URL connection provisioned data feed.
462 *
463 * Standard implementation uses [curl](https://curl.se/),
464 * hence all [*libcurl* network protocols](https://curl.se/docs/url-syntax.html) are supported,
465 * jau::io::uri::supported_protocols().
466 */
467 class ByteInStream_URL final : public ByteInStream {
468 public:
469 /**
470 * Check whether n bytes are available in the input stream.
471 *
472 * Wait up to timeout duration set in constructor until n bytes become available, where fractions_i64::zero waits infinitely.
473 *
474 * This method is blocking.
475 *
476 * @param n byte count to wait for
477 * @return true if n bytes are available, otherwise false
478 *
479 * @see read()
480 * @see @ref byte_in_stream_properties "ByteInStream Properties"
481 */
482 bool available(size_t n) noexcept override;
483
484 /**
485 * Read from the source. Moves the internal offset so that every
486 * call to read will return a new portion of the source.
487 *
488 * Use available() to wait and ensure a certain amount of bytes are available.
489 *
490 * This method is not blocking beyond the transfer of `min(available, length)` bytes.
491 *
492 * @param out the byte array to write the result to
493 * @param length the length of the byte array out
494 * @return length in bytes that was actually read and put into out
495 *
496 * @see available()
497 * @see @ref byte_in_stream_properties "ByteInStream Properties"
498 */
499 [[nodiscard]] size_t read(void* out, size_t length) noexcept override;
500
501 [[nodiscard]] size_t peek(void* out, size_t length, size_t peek_offset) noexcept override;
502
503 iostate rdstate() const noexcept override;
504
505 std::string id() const noexcept override { return m_url; }
506
507 /**
508 * Construct a ringbuffer backed Http byte input stream
509 * @param url the URL of the data to read
510 * @param timeout maximum duration in fractions of seconds to wait @ available() for next bytes, where fractions_i64::zero waits infinitely
511 */
512 ByteInStream_URL(std::string url, const jau::fraction_i64& timeout) noexcept;
513
515
517
518 bool is_open() const noexcept override;
519
520 void close() noexcept override;
521
522 ~ByteInStream_URL() noexcept override { close(); }
523
524 uint64_t tellg() const noexcept override { return m_bytes_consumed; }
525
526 bool has_content_size() const noexcept override { return m_has_content_length; }
527
528 uint64_t content_size() const noexcept override { return m_content_size; }
529
530 std::string to_string() const noexcept override;
531
532 private:
533 uint64_t get_available() const noexcept { return m_has_content_length ? m_content_size - m_bytes_consumed : 0; }
534 std::string to_string_int() const noexcept;
535
536 const std::string m_url;
537 jau::fraction_i64 m_timeout;
538 ByteRingbuffer m_buffer;
539 jau::io::url_header_sync m_header_sync;
540 jau::relaxed_atomic_bool m_has_content_length;
541 jau::relaxed_atomic_uint64 m_content_size;
542 jau::relaxed_atomic_uint64 m_total_xfered;
544 std::unique_ptr<std::thread> m_url_thread;
545 uint64_t m_bytes_consumed;
546 };
547
548 /**
549 * Parses the given path_or_uri, if it matches a supported protocol, see jau::io::uri::protocol_supported(),
550 * but is not a local file, see jau::io::uri::is_local_file_protocol(), ByteInStream_URL is being attempted.
551 *
552 * If the above fails, ByteInStream_File is attempted.
553 *
554 * If non of the above leads to a ByteInStream without ByteInStream::fail(), nullptr is returned.
555 *
556 * @param path_or_uri given path or uri for with a ByteInStream instance shall be established.
557 * @param timeout in case `path_or_uri` resolves to ByteInStream_URL, timeout is being used as maximum duration to wait for next bytes at ByteInStream_URL::available(), defaults to 20_s
558 * @return a working ByteInStream w/o ByteInStream::fail() or nullptr
559 */
560 std::unique_ptr<ByteInStream> to_ByteInStream(const std::string& path_or_uri, jau::fraction_i64 timeout=20_s) noexcept;
561
562 /**
563 * Ringbuffer-Based byte input stream with an externally provisioned data feed.
564 */
565 class ByteInStream_Feed final : public ByteInStream {
566 public:
567 /**
568 * Check whether n bytes are available in the input stream.
569 *
570 * Wait up to timeout duration set in constructor until n bytes become available, where fractions_i64::zero waits infinitely.
571 *
572 * This method is blocking.
573 *
574 * @param n byte count to wait for
575 * @return true if n bytes are available, otherwise false
576 *
577 * @see read()
578 * @see @ref byte_in_stream_properties "ByteInStream Properties"
579 */
580 bool available(size_t n) noexcept override;
581
582 /**
583 * Read from the source. Moves the internal offset so that every
584 * call to read will return a new portion of the source.
585 *
586 * Use available() to wait and ensure a certain amount of bytes are available.
587 *
588 * This method is not blocking beyond the transfer of `min(available, length)` bytes.
589 *
590 * @param out the byte array to write the result to
591 * @param length the length of the byte array out
592 * @return length in bytes that was actually read and put into out
593 *
594 * @see available()
595 * @see @ref byte_in_stream_properties "ByteInStream Properties"
596 */
597 [[nodiscard]] size_t read(void* out, size_t length) noexcept override;
598
599 [[nodiscard]] size_t peek(void* out, size_t length, size_t peek_offset) noexcept override;
600
601 iostate rdstate() const noexcept override;
602
603 std::string id() const noexcept override { return m_id; }
604
605 /**
606 * Construct a ringbuffer backed externally provisioned byte input stream
607 * @param id_name arbitrary identifier for this instance
608 * @param timeout maximum duration in fractions of seconds to wait @ available() and write(), where fractions_i64::zero waits infinitely
609 */
610 ByteInStream_Feed(std::string id_name, const jau::fraction_i64& timeout) noexcept;
611
613
615
616 bool is_open() const noexcept override;
617
618 void close() noexcept override;
619
620 ~ByteInStream_Feed() noexcept override { close(); }
621
622 uint64_t tellg() const noexcept override { return m_bytes_consumed; }
623
624 bool has_content_size() const noexcept override { return m_has_content_length; }
625
626 uint64_t content_size() const noexcept override { return m_content_size; }
627
628 /**
629 * Interrupt a potentially blocked reader.
630 *
631 * Call this method if intended to abort streaming and to interrupt the reader thread's potentially blocked available() call,
632 * i.e. done at set_eof()
633 *
634 * @see set_eof()
635 */
636 void interruptReader() noexcept {
637 m_buffer.interruptReader();
638 }
639
640 /**
641 * Write given bytes to the async ringbuffer using explicit given timeout.
642 *
643 * Wait up to explicit given timeout duration until ringbuffer space is available, where fractions_i64::zero waits infinitely.
644 *
645 * This method is blocking.
646 *
647 * @param n byte count to wait for
648 * @param in the byte array to transfer to the async ringbuffer
649 * @param length the length of the byte array in
650 * @param timeout explicit given timeout for async ringbuffer put operation
651 * @return true if successful, otherwise false on timeout or stopped feeder and subsequent calls to good() will return false.
652 */
653 [[nodiscard]] bool write(uint8_t in[], size_t length, const jau::fraction_i64& timeout) noexcept;
654
655 /**
656 * Write given bytes to the async ringbuffer.
657 *
658 * Wait up to timeout duration set in constructor until ringbuffer space is available, where fractions_i64::zero waits infinitely.
659 *
660 * This method is blocking.
661 *
662 * @param n byte count to wait for
663 * @param in the byte array to transfer to the async ringbuffer
664 * @param length the length of the byte array in
665 * @return true if successful, otherwise false on timeout or stopped feeder and subsequent calls to good() will return false.
666 */
667 [[nodiscard]] bool write(uint8_t in[], size_t length) noexcept {
668 return write(in, length, m_timeout);
669 }
670
671 /**
672 * Set known content size, informal only.
673 * @param content_length the content size in bytes
674 */
675 void set_content_size(const uint64_t size) noexcept {
676 m_content_size = size;
677 m_has_content_length = true;
678 }
679
680 /**
681 * Set end-of-data (EOS), i.e. when feeder completed provisioning bytes.
682 *
683 * Implementation issues interruptReader() to unblock a potentially blocked reader thread.
684 *
685 * @param result should be either result_t::FAILED or result_t::SUCCESS.
686 *
687 * @see interruptReader()
688 */
689 void set_eof(const async_io_result_t result) noexcept;
690
691 std::string to_string() const noexcept override;
692
693 private:
694 uint64_t get_available() const noexcept { return m_has_content_length ? m_content_size - m_bytes_consumed : 0; }
695 std::string to_string_int() const noexcept;
696
697 const std::string m_id;
698 jau::fraction_i64 m_timeout;
699 ByteRingbuffer m_buffer;
700 jau::relaxed_atomic_bool m_has_content_length;
701 jau::relaxed_atomic_uint64 m_content_size;
702 jau::relaxed_atomic_uint64 m_total_xfered;
704 uint64_t m_bytes_consumed;
705 };
706
707 /**
708 * Wrapped byte input stream with the capability
709 * to record the read byte stream at will.
710 *
711 * Peek'ed bytes won't be recorded, only read bytes.
712 */
713 class ByteInStream_Recorder final : public ByteInStream {
714 public:
715 [[nodiscard]] size_t read(void*, size_t) noexcept override;
716
717 [[nodiscard]] size_t peek(void* out, size_t length, size_t peek_offset) noexcept override {
718 return m_parent.peek(out, length, peek_offset);
719 }
720
721 bool available(size_t n) noexcept override {
722 return m_parent.available(n);
723 }
724
725 void clear(const iostate state = iostate::goodbit) noexcept override { m_parent.clear( state ); }
726 iostate rdstate() const noexcept override { return m_parent.rdstate(); }
727
728 std::string id() const noexcept override { return m_parent.id(); }
729
730 /**
731 * Construct a byte input stream wrapper using the given parent ByteInStream.
732 * @param parent the parent ByteInStream
733 * @param buffer a user defined buffer for the recording
734 */
736 : m_parent(parent), m_bytes_consumed(0), m_buffer(buffer), m_rec_offset(0), m_is_recording(false) {};
737
739
741
742 bool is_open() const noexcept override { return m_parent.is_open(); }
743
744 void close() noexcept override;
745
746 ~ByteInStream_Recorder() noexcept override { close(); }
747
748 uint64_t tellg() const noexcept override { return m_bytes_consumed; }
749
750 bool has_content_size() const noexcept override { return m_parent.has_content_size(); }
751
752 uint64_t content_size() const noexcept override { return m_parent.content_size(); }
753
754 /**
755 * Starts the recording.
756 * <p>
757 * A potential previous recording will be cleared.
758 * </p>
759 */
760 void start_recording() noexcept;
761
762 /**
763 * Stops the recording.
764 * <p>
765 * The recording persists.
766 * </p>
767 */
768 void stop_recording() noexcept;
769
770 /**
771 * Clears the recording.
772 * <p>
773 * If the recording was ongoing, also stops the recording.
774 * </p>
775 */
776 void clear_recording() noexcept;
777
778 /** Returns the reference of the recording buffer given by user. */
779 io::secure_vector<uint8_t>& get_recording() noexcept { return m_buffer; }
780
781 size_t get_bytes_recorded() noexcept { return m_buffer.size(); }
782
783 /** Returns the recording start position. */
784 uint64_t get_recording_start_pos() noexcept { return m_rec_offset; }
785
786 bool is_recording() noexcept { return m_is_recording; }
787
788 std::string to_string() const noexcept override;
789
790 private:
791 ByteInStream& m_parent;
792 uint64_t m_bytes_consumed;
793 io::secure_vector<uint8_t>& m_buffer;
794 uint64_t m_rec_offset;
795 bool m_is_recording;
796 };
797
798 /**
799 * Abstract byte output stream object,
800 * to write data to a sink.
801 *
802 * All method implementations are of `noexcept`.
803 *
804 * One may use fail() to detect whether an error has occurred.
805 */
807 {
808 public:
809 ByteOutStream() = default;
810 ~ByteOutStream() noexcept override = default;
811 ByteOutStream& operator=(const ByteOutStream&) = delete;
812 ByteOutStream(const ByteOutStream&) = delete;
813
814 /** Checks if the stream has an associated file. */
815 virtual bool is_open() const noexcept = 0;
816
817 /**
818 * Close the stream if supported by the underlying mechanism.
819 */
820 virtual void close() noexcept = 0;
821
822 /**
823 * Write to the data sink. Moves the internal offset so that every
824 * call to write will be appended to the sink.
825 *
826 * This method is not blocking beyond the transfer length bytes.
827 *
828 * @param in the input bytes to write out
829 * @param length the length of the byte array in
830 * @return length in bytes that were actually written
831 */
832 [[nodiscard]] virtual size_t write(const void* in, size_t length) noexcept = 0;
833
834 /**
835 * Write one byte.
836 * @param in the byte to be written
837 * @return true if one byte has been written, otherwise false
838 */
839 [[nodiscard]] bool write(const uint8_t& in) noexcept;
840
841 /**
842 * return the id of this data source
843 * @return std::string representing the id of this data source
844 */
845 virtual std::string id() const noexcept { return ""; }
846
847 /**
848 * Returns the output position indicator.
849 *
850 * @return number of bytes written so far.
851 */
852 virtual uint64_t tellp() const noexcept = 0;
853
854 virtual std::string to_string() const noexcept = 0;
855 };
856
857 /**
858 * File based byte output stream, including named file descriptor.
859 */
860 class ByteOutStream_File final : public ByteOutStream {
861 private:
863 /**
864 * We mimic std::ofstream via OS level file descriptor operations,
865 * giving us more flexibility and enabling use of openat() operations.
866 */
867 int m_fd;
868
869 // Remember: constexpr specifier used in a function or static data member (since C++17) declaration implies inline
870
871 public:
872 bool is_open() const noexcept override { return 0 <= m_fd; }
873
874 [[nodiscard]] size_t write(const void*, size_t) noexcept override;
875
876 std::string id() const noexcept override { return stats.path(); }
877
878 /**
879 * Returns the file descriptor if is_open(), otherwise -1 for no file descriptor.
880 *
881 * @see is_open()
882 */
883 int fd() const noexcept { return m_fd; }
884
885 /**
886 * Construct a stream based byte output stream from filesystem path,
887 * either an existing or new file.
888 *
889 * In case the file already exists, the underlying file offset is positioned at the end of the file.
890 *
891 * In case the given path is a local file URI starting with `file://`, see jau::io::uri::is_local_file_protocol(),
892 * the leading `file://` is cut off and the remainder being used.
893 *
894 * @param path the path to the file, maybe a local file URI
895 * @param mode file protection mode for a new file, otherwise ignored.
896 */
897 ByteOutStream_File(const std::string& path, const jau::fs::fmode_t mode = jau::fs::fmode_t::def_file_prot) noexcept;
898
899 /**
900 * Construct a stream based byte output stream from filesystem path and parent directory file descriptor,
901 * either an existing or new file.
902 *
903 * In case the file already exists, the underlying file offset is positioned at the end of the file.
904 *
905 * In case the given path is a local file URI starting with `file://`, see jau::io::uri::is_local_file_protocol(),
906 * the leading `file://` is cut off and the remainder being used.
907 *
908 * @param dirfd parent directory file descriptor
909 * @param path the path to the file, maybe a local file URI
910 * @param mode file protection mode for a new file, otherwise ignored.
911 */
912 ByteOutStream_File(const int dirfd, const std::string& path, const jau::fs::fmode_t mode = jau::fs::fmode_t::def_file_prot) noexcept;
913
914 /**
915 * Construct a stream based byte output stream by duplicating given file descriptor
916 *
917 * In case the given path is a local file URI starting with `file://`, see jau::io::uri::is_local_file_protocol(),
918 * the leading `file://` is cut off and the remainder being used.
919 *
920 * @param fd file descriptor to duplicate leaving the given `fd` untouched
921 */
922 ByteOutStream_File(const int fd) noexcept;
923
925
927
928 void close() noexcept override;
929
930 ~ByteOutStream_File() noexcept override { close(); }
931
932 uint64_t tellp() const noexcept override { return m_bytes_consumed; }
933
934 std::string to_string() const noexcept override;
935
936 private:
937 uint64_t m_bytes_consumed;
938 };
939
940
941 /**@}*/
942
943} // namespace elevator::io
944
945#endif /* JAU_BYTE_STREAM_HPP_ */
Platform agnostic representation of POSIX ::lstat() and ::stat() for a given pathname.
Definition: file_util.hpp:406
std::string path() const noexcept
Returns the unix path representation.
Definition: file_util.hpp:530
Ringbuffer-Based byte input stream with an externally provisioned data feed.
void set_content_size(const uint64_t size) noexcept
Set known content size, informal only.
ByteInStream_Feed & operator=(const ByteInStream_Feed &)=delete
uint64_t tellg() const noexcept override
Returns the input position indicator, similar to std::basic_istream.
void interruptReader() noexcept
Interrupt a potentially blocked reader.
bool has_content_size() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
ByteInStream_Feed(const ByteInStream_Feed &)=delete
bool write(uint8_t in[], size_t length) noexcept
Write given bytes to the async ringbuffer.
uint64_t content_size() const noexcept override
Returns the content_size if known.
std::string id() const noexcept override
return the id of this data source
File based byte input stream, including named file descriptor.
uint64_t content_size() const noexcept override
Returns the content_size if known.
int fd() const noexcept
Returns the file descriptor if is_open(), otherwise -1 for no file descriptor.
ByteInStream_File(const ByteInStream_File &)=delete
std::string id() const noexcept override
return the id of this data source
bool is_open() const noexcept override
Checks if the stream has an associated file.
bool has_content_size() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
uint64_t tellg() const noexcept override
Returns the input position indicator, similar to std::basic_istream.
ByteInStream_File & operator=(const ByteInStream_File &)=delete
Wrapped byte input stream with the capability to record the read byte stream at will.
size_t get_bytes_recorded() noexcept
uint64_t tellg() const noexcept override
Returns the input position indicator, similar to std::basic_istream.
bool available(size_t n) noexcept override
Return whether n bytes are available in the input stream, if has_content_size() or using an asynchron...
iostate rdstate() const noexcept override
Returns the current state flags.
std::string id() const noexcept override
return the id of this data source
bool is_open() const noexcept override
Checks if the stream has an associated file.
ByteInStream_Recorder(ByteInStream &parent, io::secure_vector< uint8_t > &buffer) noexcept
Construct a byte input stream wrapper using the given parent ByteInStream.
ByteInStream_Recorder & operator=(const ByteInStream_Recorder &)=delete
void clear(const iostate state=iostate::goodbit) noexcept override
Clears state flags by assignment to the given value.
size_t peek(void *out, size_t length, size_t peek_offset) noexcept override
Read from the source but do not modify the internal offset.
uint64_t get_recording_start_pos() noexcept
Returns the recording start position.
bool has_content_size() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
uint64_t content_size() const noexcept override
Returns the content_size if known.
ByteInStream_Recorder(const ByteInStream_Recorder &)=delete
Secure Memory-Based byte input stream.
bool has_content_size() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
uint64_t content_size() const noexcept override
Returns the content_size if known.
ByteInStream_SecMemory(io::secure_vector< uint8_t > in)
Construct a secure memory source that reads from a secure_vector.
uint64_t tellg() const noexcept override
Returns the input position indicator, similar to std::basic_istream.
ByteInStream_SecMemory(const std::vector< uint8_t > &in)
Construct a secure memory source that reads from a std::vector.
ByteInStream_SecMemory(const uint8_t in[], size_t length)
Construct a secure memory source that reads from a byte array.
bool is_open() const noexcept override
Checks if the stream has an associated file.
Ringbuffer-Based byte input stream with a URL connection provisioned data feed.
ByteInStream_URL(const ByteInStream_URL &)=delete
uint64_t tellg() const noexcept override
Returns the input position indicator, similar to std::basic_istream.
ByteInStream_URL & operator=(const ByteInStream_URL &)=delete
uint64_t content_size() const noexcept override
Returns the content_size if known.
bool has_content_size() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
std::string id() const noexcept override
return the id of this data source
Abstract byte input stream object.
virtual void close() noexcept=0
Close the stream if supported by the underlying mechanism.
virtual bool has_content_size() const noexcept=0
Returns true if implementation is aware of content_size(), otherwise false.
virtual uint64_t content_size() const noexcept=0
Returns the content_size if known.
virtual size_t peek(void *out, size_t length, size_t peek_offset) noexcept=0
Read from the source but do not modify the internal offset.
virtual bool is_open() const noexcept=0
Checks if the stream has an associated file.
size_t discard(size_t N) noexcept
Discard the next N bytes of the data.
Definition: byte_stream.cpp:90
virtual uint64_t tellg() const noexcept=0
Returns the input position indicator, similar to std::basic_istream.
virtual std::string to_string() const noexcept=0
~ByteInStream() noexcept override=default
virtual size_t read(void *out, size_t length) noexcept=0
Read from the source.
virtual bool available(size_t n) noexcept=0
Return whether n bytes are available in the input stream, if has_content_size() or using an asynchron...
File based byte output stream, including named file descriptor.
ByteOutStream_File & operator=(const ByteOutStream_File &)=delete
std::string id() const noexcept override
return the id of this data source
int fd() const noexcept
Returns the file descriptor if is_open(), otherwise -1 for no file descriptor.
uint64_t tellp() const noexcept override
Returns the output position indicator.
bool is_open() const noexcept override
Checks if the stream has an associated file.
ByteOutStream_File(const ByteOutStream_File &)=delete
Abstract byte output stream object, to write data to a sink.
virtual uint64_t tellp() const noexcept=0
Returns the output position indicator.
~ByteOutStream() noexcept override=default
Supporting std::basic_ios's iostate functionality for all ByteInStream implementations.
bool fail() const noexcept
Checks if an error has occurred.
iostate_func & operator=(const iostate_func &o) noexcept=default
iostate_func & operator=(iostate_func &&o) noexcept=default
constexpr iostate rdstate_impl() const noexcept
iostate_func(iostate_func &&o) noexcept=default
void setstate(const iostate state) noexcept
Sets state flags, by keeping its previous bits.
bool good() const noexcept
Checks if no error nor eof() has occurred i.e.
bool bad() const noexcept
Checks if a non-recoverable error has occurred.
bool operator!() const noexcept
Checks if an error has occurred, synonym of fail().
virtual ~iostate_func() noexcept=default
bool eof() const noexcept
Checks if end-of-file has been reached.
bool timeout() const noexcept
Checks if a timeout (non-recoverable) has occurred.
iostate_func(const iostate_func &o) noexcept=default
virtual iostate rdstate() const noexcept
Returns the current state flags.
virtual void clear(const iostate state=iostate::goodbit) noexcept
Clears state flags by assignment to the given value.
constexpr void setstate_impl(iostate state) const noexcept
fmode_t
Generic file type and POSIX protection mode bits as used in file_stats, touch(), mkdir() etc.
Definition: file_util.hpp:236
@ def_file_prot
Default file protection bit: Safe default: POSIX S_IRUSR | S_IWUSR | S_IRGRP or read_usr | write_usr ...
std::vector< T, jau::callocator_sec< T > > secure_vector
Definition: io_util.hpp:46
iostate
Mimic std::ios_base::iostate for state functionality, see iostate_func.
Definition: byte_stream.hpp:53
constexpr iostate operator^(const iostate lhs, const iostate rhs) noexcept
Definition: byte_stream.hpp:78
constexpr iostate operator|(const iostate lhs, const iostate rhs) noexcept
Definition: byte_stream.hpp:81
std::unique_ptr< ByteInStream > to_ByteInStream(const std::string &path_or_uri, jau::fraction_i64 timeout=20_s) noexcept
Parses the given path_or_uri, if it matches a supported protocol, see jau::io::uri::protocol_supporte...
constexpr iostate operator~(const iostate rhs) noexcept
Definition: byte_stream.hpp:75
async_io_result_t
Asynchronous I/O operation result value.
Definition: io_util.hpp:68
jau::ringbuffer< uint8_t, size_t > ByteRingbuffer
Definition: io_util.hpp:50
constexpr iostate operator&(const iostate lhs, const iostate rhs) noexcept
Definition: byte_stream.hpp:84
constexpr uint32_t number(const iostate rhs) noexcept
Definition: byte_stream.hpp:72
constexpr iostate & operator^=(iostate &lhs, const iostate rhs) noexcept
Definition: byte_stream.hpp:95
std::string to_string(const iostate mask) noexcept
jau::ordered_atomic< async_io_result_t, std::memory_order_relaxed > relaxed_atomic_async_io_result_t
Definition: io_util.hpp:78
constexpr iostate & operator|=(iostate &lhs, const iostate rhs) noexcept
Definition: byte_stream.hpp:87
constexpr iostate & operator&=(iostate &lhs, const iostate rhs) noexcept
Definition: byte_stream.hpp:91
constexpr bool operator!=(const iostate lhs, const iostate rhs) noexcept
constexpr bool operator==(const iostate lhs, const iostate rhs) noexcept
Definition: byte_stream.hpp:99
@ goodbit
No error occurred nor has EOS being reached.
@ none
No error occurred nor has EOS being reached.
@ failbit
Input or output operation failed (formatting or extraction error).
@ eofbit
An input operation reached the end of its stream.
@ timeout
Input or output operation failed due to timeout.
@ badbit
Irrecoverable stream error, including loss of integrity of the underlying stream or media.
__pack(...): Produces MSVC, clang and gcc compatible lead-in and -out macros.
Definition: backtrace.hpp:32
STL namespace.