jaulib v1.3.0
Jau Support Library (C++, Java, ..)
byte_stream.cpp
Go to the documentation of this file.
1/*
2 * Author: Sven Gothel <sgothel@jausoft.com>
3 * Copyright (c) 2021-2023 Gothel Software e.K.
4 *
5 * ByteInStream, ByteInStream_SecMemory and ByteInStream_istream are derived from Botan under same license:
6 * - Copyright (c) 1999-2007 Jack Lloyd
7 * - Copyright (c) 2005 Matthew Gregan
8 *
9 * Permission is hereby granted, free of charge, to any person obtaining
10 * a copy of this software and associated documentation files (the
11 * "Software"), to deal in the Software without restriction, including
12 * without limitation the rights to use, copy, modify, merge, publish,
13 * distribute, sublicense, and/or sell copies of the Software, and to
14 * permit persons to whom the Software is furnished to do so, subject to
15 * the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be
18 * included in all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
21 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
22 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
23 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
24 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
25 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
26 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
27 */
28
29#include <chrono>
30
31// #include <botan_all.h>
32
33#include <jau/debug.hpp>
34#include <jau/byte_stream.hpp>
35
36extern "C" {
37 #include <unistd.h>
38 #include <sys/types.h>
39 #include <sys/stat.h>
40 #include <fcntl.h>
41}
42
43#ifdef USE_LIBCURL
44 #include <curl/curl.h>
45#endif
46
47#include <thread>
48#include <pthread.h>
49
50#ifndef O_BINARY
51#define O_BINARY 0
52#endif
53#ifndef O_NONBLOCK
54#define O_NONBLOCK 0
55#endif
56
57using namespace jau::io;
58using namespace jau::fractions_i64_literals;
59using namespace jau::int_literals;
60
61#if defined(__FreeBSD__)
62 typedef off_t off64_t;
63 #define __posix_openat64 ::openat
64 #define __posix_lseek64 ::lseek
65#else
66 #define __posix_openat64 ::openat64
67 #define __posix_lseek64 ::lseek64
68#endif
69
70#ifdef USE_LIBCURL
71 const size_t jau::io::BEST_URLSTREAM_RINGBUFFER_SIZE = 2_uz * (size_t)CURL_MAX_WRITE_SIZE;
72#else
73 const size_t jau::io::BEST_URLSTREAM_RINGBUFFER_SIZE = 2_uz * 16384_uz;
74#endif
75
76inline constexpr void copy_mem(void* out, const void* in, size_t n) noexcept {
77 if(in != nullptr && out != nullptr && n > 0) {
78 std::memmove(out, in, sizeof(uint8_t)*n);
79 }
80}
81
82bool ByteInStream::read(uint8_t& out) noexcept {
83 return 1 == read(&out, 1);
84}
85
86bool ByteInStream::peek(uint8_t& out) noexcept {
87 return 1 == peek(&out, 1, 0);
88}
89
90size_t ByteInStream::discard(size_t n) noexcept {
91 uint8_t buf[1024] = { 0 };
92 size_t discarded = 0;
93
94 while(n)
95 {
96 const size_t got = read(buf, std::min(n, sizeof(buf)));
97 if( 0 == got ) {
98 break;
99 }
100 discarded += got;
101 n -= got;
102 }
103
104 return discarded;
105}
106
107size_t ByteInStream_SecMemory::read(void* out, size_t length) noexcept {
108 if( 0 == length || !good() ) {
109 return 0;
110 }
111 const size_t got = std::min<size_t>(m_source.size() - m_offset, length);
112 copy_mem(out, m_source.data() + m_offset, got);
113 m_offset += got;
114 if( m_source.size() == m_offset ) {
115 setstate_impl( iostate::eofbit );
116 }
117 return got;
118}
119
120bool ByteInStream_SecMemory::available(size_t n) noexcept {
121 return m_source.size() - m_offset >= n;
122}
123
124size_t ByteInStream_SecMemory::peek(void* out, size_t length, size_t peek_offset) noexcept {
125 const size_t bytes_left = m_source.size() - m_offset;
126 if(peek_offset >= bytes_left) {
127 return 0;
128 }
129 const size_t got = std::min(bytes_left - peek_offset, length);
130 copy_mem(out, &m_source[m_offset + peek_offset], got);
131 return got;
132}
133
134ByteInStream_SecMemory::ByteInStream_SecMemory(const std::string& in)
135: m_source(cast_char_ptr_to_uint8(in.data()),
136 cast_char_ptr_to_uint8(in.data()) + in.length()),
137 m_offset(0)
138{ }
139
141 m_source.clear();
142 m_offset = 0;
144}
145
146std::string ByteInStream_SecMemory::to_string() const noexcept {
147 return "ByteInStream_SecMemory[content size "+jau::to_decstring(m_source.size())+
148 ", consumed "+jau::to_decstring(m_offset)+
149 ", available "+jau::to_decstring(m_source.size()-m_offset)+
150 ", iostate["+jau::io::to_string(rdstate())+
151 "]]";
152}
153
154template<typename T>
155static void append_bitstr(std::string& out, T mask, T bit, const std::string& bitstr, bool& comma) {
156 if( bit == ( mask & bit ) ) {
157 if( comma ) { out.append(", "); }
158 out.append(bitstr); comma = true;
159 }
160}
161
162#define APPEND_BITSTR(U,V,W,M) append_bitstr(out, M, U::V, #W, comma);
163
164#define IOSTATE_ENUM(X,M) \
165 X(iostate,badbit,bad,M) \
166 X(iostate,eofbit,eof,M) \
167 X(iostate,failbit,fail,M) \
168 X(iostate,timeout,timeout,M)
169
170std::string jau::io::to_string(const iostate mask) noexcept {
171 if( iostate::goodbit == mask ) {
172 return "good";
173 }
174 std::string out;
175 bool comma = false;
177 return out;
178}
179
180size_t ByteInStream_File::read(void* out, size_t length) noexcept {
181 if( 0 == length || !good() ) {
182 return 0;
183 }
184 uint8_t* out_u8 = static_cast<uint8_t*>(out);
185 size_t total = 0;
186 while( total < length ) {
187 ssize_t len;
188 while ( ( len = ::read(m_fd, out_u8+total, length-total) ) < 0 ) {
189 if ( errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR ) {
190 // cont temp unavail or interruption
191 // unlikely for regular files and we open w/o O_NONBLOCK
192 // - EAGAIN (file) and EWOULDBLOCK (socket) if blocking
193 // - EINTR (signal)
194 continue;
195 }
196 // Check errno == ETIMEDOUT ??
197 setstate_impl( iostate::failbit );
198 DBG_PRINT("ByteInStream_File::read: Error occurred in %s, errno %d %s", to_string().c_str(), errno, strerror(errno));
199 return 0;
200 }
201 total += static_cast<size_t>(len);
202 if( 0 == len || ( m_has_content_length && m_bytes_consumed + total >= m_content_size ) ) {
203 setstate_impl( iostate::eofbit ); // Note: std::istream also sets iostate::failbit on eof, we don't.
204 break;
205 }
206 }
207 m_bytes_consumed += total;
208 return total;
209}
210
211size_t ByteInStream_File::peek(void* out, size_t length, size_t offset) noexcept {
212 if( 0 == length || !good() || offset > std::numeric_limits<off64_t>::max() ||
213 ( m_has_content_length && m_content_size - m_bytes_consumed < offset + 1 /* min number of bytes to read */ ) ) {
214 return 0;
215 }
216 size_t got = 0;
217
218 off64_t abs_pos = 0;
219 if( 0 < offset ) {
220 abs_pos = __posix_lseek64(m_fd, static_cast<off64_t>(offset), SEEK_CUR);
221 if( 0 > abs_pos ) {
222 setstate_impl( iostate::failbit );
223 DBG_PRINT("ByteInStream_File::peek: Error occurred (offset1 %zd) in %s, errno %d %s",
224 offset, to_string().c_str(), errno, strerror(errno));
225 return 0;
226 }
227 }
228 if( abs_pos == static_cast<off64_t>(offset) ) {
229 ssize_t len = 0;
230 while ( ( len = ::read(m_fd, out, length) ) < 0 ) {
231 if ( errno == EAGAIN || errno == EINTR ) {
232 // cont temp unavail or interruption
233 continue;
234 }
235 // Check errno == ETIMEDOUT ??
236 setstate_impl( iostate::failbit );
237 DBG_PRINT("ByteInStream_File::peak: Error occurred (read) in %s, errno %d %s", to_string().c_str(), errno, strerror(errno));
238 return 0;
239 }
240 got = len; // potentially zero bytes, i.e. eof
241 }
242 if( __posix_lseek64(m_fd, static_cast<off64_t>(m_bytes_consumed), SEEK_SET) < 0 ) {
243 // even though we were able to fetch the desired data above, let's fail if position reset fails
244 setstate_impl( iostate::failbit );
245 DBG_PRINT("ByteInStream_File::peek: Error occurred (offset2 %zd) in %s, errno %d %s",
246 offset, to_string().c_str(), errno, strerror(errno));
247 return 0;
248 }
249 return got;
250}
251
252bool ByteInStream_File::available(size_t n) noexcept {
253 return is_open() && good() && ( !m_has_content_length || m_content_size - m_bytes_consumed >= (uint64_t)n );
254};
255
257: ByteInStream(),
258 stats(fd), m_fd(-1), m_has_content_length(false), m_content_size(0), m_bytes_consumed(0)
259{
260 if( !stats.exists() || !stats.has_access() ) {
261 setstate_impl( iostate::failbit ); // Note: conforming with std::ifstream open
262 DBG_PRINT("ByteInStream_File::ctor: Error, not an existing or accessible file in %s, %s", stats.to_string().c_str(), to_string().c_str());
263 } else {
264 m_has_content_length = stats.has( jau::fs::file_stats::field_t::size );
265 m_content_size = m_has_content_length ? stats.size() : 0;
266 m_fd = ::dup(fd);
267 if ( 0 > m_fd ) {
268 setstate_impl( iostate::failbit ); // Note: conforming with std::ifstream open
269 DBG_PRINT("ByteInStream_File::ctor: Error occurred in %s, %s", stats.to_string().c_str(), to_string().c_str());
270 }
271 }
272}
273
274ByteInStream_File::ByteInStream_File(const int dirfd, const std::string& path) noexcept
275: ByteInStream(),
276 stats(), m_fd(-1), m_has_content_length(false), m_content_size(0), m_bytes_consumed(0)
277{
279 // cut of leading `file://`
280 std::string path2 = path.substr(7);
281 stats = jau::fs::file_stats(dirfd, path2);
282 } else {
283 stats = jau::fs::file_stats(dirfd, path);
284 }
285 if( !stats.exists() || !stats.has_access() ) {
286 setstate_impl( iostate::failbit ); // Note: conforming with std::ifstream open
287 DBG_PRINT("ByteInStream_File::ctor: Error, not an existing or accessible file in %s, %s", stats.to_string().c_str(), to_string().c_str());
288 } else {
289 if( stats.has( jau::fs::file_stats::field_t::size ) ) {
290 m_has_content_length = true;
291 m_content_size = stats.size();
292 } else {
293 m_has_content_length = false;
294 m_content_size = 0;
295 }
296 // O_NONBLOCK, is useless on files and counter to this class logic
297 const int src_flags = O_RDONLY|O_BINARY|O_NOCTTY;
298 if( stats.has_fd() ) {
299 m_fd = ::dup( stats.fd() );
300 } else {
301 m_fd = __posix_openat64(dirfd, stats.path().c_str(), src_flags);
302 }
303 if ( 0 > m_fd ) {
304 setstate_impl( iostate::failbit ); // Note: conforming with std::ifstream open
305 DBG_PRINT("ByteInStream_File::ctor: Error while opening %s, %s", stats.to_string().c_str(), to_string().c_str());
306 }
307 }
308}
309
310ByteInStream_File::ByteInStream_File(const std::string& path) noexcept
311: ByteInStream_File(AT_FDCWD, path) {}
312
314 if( 0 <= m_fd ) {
315 ::close(m_fd);
316 m_fd = -1;
318 }
319}
320
321std::string ByteInStream_File::to_string() const noexcept {
322 return "ByteInStream_File[content_length "+( has_content_size() ? jau::to_decstring(m_content_size) : "n/a" )+
323 ", consumed "+jau::to_decstring(m_bytes_consumed)+
324 ", available "+jau::to_decstring(get_available())+
325 ", fd "+std::to_string(m_fd)+
326 ", iostate["+jau::io::to_string(rdstate())+
327 "], "+stats.to_string()+
328 "]";
329}
330
331
332ByteInStream_URL::ByteInStream_URL(std::string url, const jau::fraction_i64& timeout) noexcept
333: m_url(std::move(url)), m_timeout(timeout), m_buffer(BEST_URLSTREAM_RINGBUFFER_SIZE),
334 m_header_sync(), m_has_content_length( false ), m_content_size( 0 ),
335 m_total_xfered( 0 ), m_result( io::async_io_result_t::NONE ),
336 m_bytes_consumed(0)
337
338{
339 m_url_thread = read_url_stream(m_url, m_buffer, m_header_sync, m_has_content_length, m_content_size, m_total_xfered, m_result);
340 if( nullptr == m_url_thread ) {
341 // url protocol not supported
342 m_result = async_io_result_t::FAILED;
343 }
344}
345
346void ByteInStream_URL::close() noexcept {
347 DBG_PRINT("ByteInStream_URL: close.0 %s, %s", id().c_str(), to_string_int().c_str());
348
349 if( async_io_result_t::NONE == m_result ) {
350 m_result = async_io_result_t::SUCCESS; // signal end of streaming
351 }
352
353 m_buffer.close( true /* zeromem */); // also unblocks all r/w ops
354 if( nullptr != m_url_thread && m_url_thread->joinable() ) {
355 DBG_PRINT("ByteInStream_URL: close.1 %s, %s", id().c_str(), m_buffer.toString().c_str());
356 m_url_thread->join();
357 }
358 m_url_thread = nullptr;
359 DBG_PRINT("ByteInStream_URL: close.X %s, %s", id().c_str(), to_string_int().c_str());
360}
361
362bool ByteInStream_URL::available(size_t n) noexcept {
363 if( !good() || async_io_result_t::NONE != m_result ) {
364 // url thread ended, only remaining bytes in buffer available left
365 return m_buffer.size() >= n;
366 }
367 m_header_sync.wait_until_completion(m_timeout);
368 if( m_has_content_length && m_content_size - m_bytes_consumed < n ) {
369 return false;
370 }
371 // I/O still in progress, we have to poll until data is available or timeout
372 // set_eof() unblocks ringbuffer via set_end_of_input(true) permanently, hence blocking call on !m_has_content_length is OK.
373 bool timeout_occured;
374 const size_t avail = m_buffer.waitForElements(n, m_timeout, timeout_occured);
375 if( avail < n ) {
376 if( timeout_occured ) {
377 setstate_impl( iostate::timeout );
378 if( async_io_result_t::NONE == m_result ) {
379 m_result = async_io_result_t::FAILED;
380 }
381 m_buffer.interruptWriter();
382 }
383 return false;
384 } else {
385 return true;
386 }
387}
388
389bool ByteInStream_URL::is_open() const noexcept {
390 // url thread has not ended or remaining bytes in buffer available left
391 return async_io_result_t::NONE == m_result || m_buffer.size() > 0;
392}
393
394size_t ByteInStream_URL::read(void* out, size_t length) noexcept {
395 m_header_sync.wait_until_completion(m_timeout);
396 if( 0 == length || !good() ) {
397 return 0;
398 }
399 bool timeout_occured = false;
400 // set_eof() unblocks ringbuffer via set_end_of_input(true) permanently, hence blocking call on !m_has_content_length is OK.
401 const size_t got = m_buffer.getBlocking(static_cast<uint8_t*>(out), length, 1, m_timeout, timeout_occured);
402 m_bytes_consumed += got;
403 if( timeout_occured ) {
404 setstate_impl( iostate::timeout );
405 if( async_io_result_t::NONE == m_result ) {
406 m_result = async_io_result_t::FAILED;
407 }
408 m_buffer.interruptWriter();
409 }
410 // DBG_PRINT("ByteInStream_URL::read: size %zu/%zu bytes, %s", got, length, to_string_int().c_str() );
411 return got;
412}
413
414size_t ByteInStream_URL::peek(void* out, size_t length, size_t peek_offset) noexcept {
415 (void)out;
416 (void)length;
417 (void)peek_offset;
418 ERR_PRINT("ByteInStream_URL::peek not implemented");
419 return 0;
420}
421
423 if ( ( async_io_result_t::NONE != m_result && m_buffer.isEmpty() ) ||
424 ( m_has_content_length && m_bytes_consumed >= m_content_size ) )
425 {
427 }
428 if( async_io_result_t::FAILED == m_result ) {
430 }
431 return rdstate_impl();
432}
433
434std::string ByteInStream_URL::to_string_int() const noexcept {
435 return m_url+", Url[content_length "+( has_content_size() ? jau::to_decstring(m_content_size.load()) : "n/a" )+
436 ", xfered "+jau::to_decstring(m_total_xfered.load())+
437 ", result "+std::to_string((int8_t)m_result.load())+
438 "], consumed "+jau::to_decstring(m_bytes_consumed)+
439 ", available "+jau::to_decstring(get_available())+
440 ", iostate["+jau::io::to_string(rdstate())+
441 "], "+m_buffer.toString();
442}
443std::string ByteInStream_URL::to_string() const noexcept {
444 return "ByteInStream_URL["+to_string_int()+"]";
445}
446
447std::unique_ptr<ByteInStream> jau::io::to_ByteInStream(const std::string& path_or_uri, jau::fraction_i64 timeout) noexcept {
448 if( !jau::io::uri_tk::is_local_file_protocol(path_or_uri) &&
450 {
451 std::unique_ptr<ByteInStream> res = std::make_unique<ByteInStream_URL>(path_or_uri, timeout);
452 if( nullptr != res && !res->fail() ) {
453 return res;
454 }
455 }
456 std::unique_ptr<ByteInStream> res = std::make_unique<ByteInStream_File>(path_or_uri);
457 if( nullptr != res && !res->fail() ) {
458 return res;
459 }
460 return nullptr;
461}
462
463ByteInStream_Feed::ByteInStream_Feed(std::string id_name, const jau::fraction_i64& timeout) noexcept
464: m_id(std::move(id_name)), m_timeout(timeout), m_buffer(BEST_URLSTREAM_RINGBUFFER_SIZE),
465 m_has_content_length( false ), m_content_size( 0 ), m_total_xfered( 0 ), m_result( io::async_io_result_t::NONE ),
466 m_bytes_consumed(0)
467{ }
468
470 DBG_PRINT("ByteInStream_Feed: close.0 %s, %s", id().c_str(), to_string_int().c_str());
471
472 if( async_io_result_t::NONE == m_result ) {
473 m_result = async_io_result_t::SUCCESS; // signal end of streaming
474 }
475 m_buffer.close( true /* zeromem */); // also unblocks all r/w ops
476 DBG_PRINT("ByteInStream_Feed: close.X %s, %s", id().c_str(), to_string_int().c_str());
477}
478
479bool ByteInStream_Feed::available(size_t n) noexcept {
480 if( !good() || async_io_result_t::NONE != m_result ) {
481 // feeder completed, only remaining bytes in buffer available left
482 return m_buffer.size() >= n;
483 }
484 if( m_has_content_length && m_content_size - m_bytes_consumed < n ) {
485 return false;
486 }
487 // I/O still in progress, we have to poll until data is available or timeout
488 // set_eof() unblocks ringbuffer via set_end_of_input(true) permanently, hence blocking call on !m_has_content_length is OK.
489 bool timeout_occured;
490 const size_t avail = m_buffer.waitForElements(n, m_timeout, timeout_occured);
491 if( avail < n ) {
492 if( timeout_occured ) {
493 setstate_impl( iostate::timeout );
494 if( async_io_result_t::NONE == m_result ) {
495 m_result = async_io_result_t::FAILED;
496 }
497 m_buffer.interruptWriter();
498 }
499 return false;
500 } else {
501 return true;
502 }
503}
504
505bool ByteInStream_Feed::is_open() const noexcept {
506 // feeder has not ended or remaining bytes in buffer available left
507 return async_io_result_t::NONE == m_result || m_buffer.size() > 0;
508}
509
510size_t ByteInStream_Feed::read(void* out, size_t length) noexcept {
511 if( 0 == length || !good() ) {
512 return 0;
513 }
514 bool timeout_occured = false;
515 // set_eof() unblocks ringbuffer via set_end_of_input(true) permanently, hence blocking call on !m_has_content_length is OK.
516 const size_t got = m_buffer.getBlocking(static_cast<uint8_t*>(out), length, 1, m_timeout, timeout_occured);
517 m_bytes_consumed += got;
518 if( timeout_occured ) {
519 setstate_impl( iostate::timeout );
520 if( async_io_result_t::NONE == m_result ) {
521 m_result = async_io_result_t::FAILED;
522 }
523 m_buffer.interruptWriter();
524 }
525 // DBG_PRINT("ByteInStream_Feed::read: size %zu/%zu bytes, timeout_occured %d, %s", got, length, timeout_occured, to_string_int().c_str() );
526 return got;
527}
528
529size_t ByteInStream_Feed::peek(void* out, size_t length, size_t peek_offset) noexcept {
530 (void)out;
531 (void)length;
532 (void)peek_offset;
533 ERR_PRINT("ByteInStream_Feed::peek not implemented");
534 return 0;
535}
536
538 if ( ( async_io_result_t::NONE != m_result && m_buffer.isEmpty() ) ||
539 ( m_has_content_length && m_bytes_consumed >= m_content_size ) )
540 {
542 }
543 if( async_io_result_t::FAILED == m_result ) {
545 }
546 return rdstate_impl();
547}
548
549bool ByteInStream_Feed::write(uint8_t in[], size_t length, const jau::fraction_i64& timeout) noexcept {
550 if( 0 < length && ( good() && async_io_result_t::NONE == m_result ) ) { // feeder still running
551 bool timeout_occured;
552 if( m_buffer.putBlocking(in, in+length, timeout, timeout_occured) ) {
553 m_total_xfered.fetch_add(length);
554 return true;
555 } else {
556 if( timeout_occured ) {
557 setstate_impl( iostate::timeout );
558 m_buffer.interruptWriter();
559 } else {
560 setstate_impl( iostate::failbit );
561 }
562 if( async_io_result_t::NONE == m_result ) {
563 m_result = async_io_result_t::FAILED;
564 }
565 return false;
566 }
567 } else {
568 return false;
569 }
570}
571
573 m_result = result;
574 m_buffer.set_end_of_input(true); // still considering last data, also irqs blocking ringbuffer reader
575}
576
577std::string ByteInStream_Feed::to_string_int() const noexcept {
578 return m_id+", ext[content_length "+( has_content_size() ? jau::to_decstring(m_content_size.load()) : "n/a" )+
579 ", xfered "+jau::to_decstring(m_total_xfered.load())+
580 ", result "+std::to_string((int8_t)m_result.load())+
581 "], consumed "+std::to_string(m_bytes_consumed)+
582 ", available "+std::to_string(get_available())+
583 ", iostate["+jau::io::to_string(rdstate())+
584 "], "+m_buffer.toString();
585}
586
587std::string ByteInStream_Feed::to_string() const noexcept {
588 return "ByteInStream_Feed["+to_string_int()+"]";
589}
590
593 m_parent.close();
594 DBG_PRINT("ByteInStream_Recorder: close.X %s", id().c_str());
595}
596
598 m_buffer.resize(0);
599 m_rec_offset = m_bytes_consumed;
600 m_is_recording = true;
601}
602
604 m_is_recording = false;
605}
606
608 m_is_recording = false;
609 m_buffer.clear();
610 m_rec_offset = 0;
611}
612
613size_t ByteInStream_Recorder::read(void* out, size_t length) noexcept {
614 const size_t consumed_bytes = m_parent.read(out, length);
615 m_bytes_consumed += consumed_bytes;
616 if( is_recording() ) {
617 uint8_t* out_u8 = static_cast<uint8_t*>(out);
618 m_buffer.insert(m_buffer.end(), out_u8, out_u8+consumed_bytes);
619 }
620 return consumed_bytes;
621}
622
623std::string ByteInStream_Recorder::to_string() const noexcept {
624 return "ByteInStream_Recorder[parent "+m_parent.id()+", recording[on "+std::to_string(m_is_recording)+
625 " offset "+jau::to_decstring(m_rec_offset)+
626 "], consumed "+jau::to_decstring(m_bytes_consumed)+
627 ", iostate["+jau::io::to_string(rdstate())+"]]";
628}
629
630bool ByteOutStream::write(const uint8_t& in) noexcept {
631 return 1 == write(&in, 1);
632}
633
634size_t ByteOutStream_File::write(const void* out, size_t length) noexcept {
635 if( 0 == length || fail() ) {
636 return 0;
637 }
638 const uint8_t* out_u8 = static_cast<const uint8_t*>(out);
639 size_t total = 0;
640 while( total < length ) {
641 ssize_t len;
642 while ( ( len = ::write(m_fd, out_u8+total, length-total) ) < 0 ) {
643 if ( errno == EAGAIN || errno == EINTR ) {
644 // cont temp unavail or interruption
645 // unlikely for regular files and we open w/o O_NONBLOCK
646 // - EAGAIN (file) and EWOULDBLOCK (socket) if blocking
647 // - EINTR (signal)
648 continue;
649 }
650 // Check errno == ETIMEDOUT ??
651 setstate_impl( iostate::failbit );
652 DBG_PRINT("ByteOutStream_File::write: Error occurred in %s, errno %d %s", to_string().c_str(), errno, strerror(errno));
653 return 0;
654 }
655 total += static_cast<size_t>(len);
656 if( 0 == len ) {
657 setstate_impl( iostate::failbit);
658 break;
659 }
660 }
661 m_bytes_consumed += total;
662 return total;
663}
664
666: stats(fd), m_fd(-1),
667 m_bytes_consumed(0)
668{
669 if( !stats.exists() || !stats.has_access() ) {
670 setstate_impl( iostate::failbit ); // Note: conforming with std::ifstream open
671 DBG_PRINT("ByteOutStream_File::ctor: Error, not an existing or accessible file in %s, %s", stats.to_string().c_str(), to_string().c_str());
672 } else {
673 m_fd = ::dup(fd);
674 if ( 0 > m_fd ) {
675 setstate_impl( iostate::failbit ); // Note: conforming with std::ifstream open
676 DBG_PRINT("ByteOutStream_File::ctor: Error occurred in %s, %s", stats.to_string().c_str(), to_string().c_str());
677 }
678 // open file-descriptor is appending anyways
679 }
680}
681
682ByteOutStream_File::ByteOutStream_File(const int dirfd, const std::string& path, const jau::fs::fmode_t mode) noexcept
683: stats(), m_fd(-1),
684 m_bytes_consumed(0)
685{
687 // cut of leading `file://`
688 std::string path2 = path.substr(7);
689 stats = jau::fs::file_stats(dirfd, path2);
690 } else {
691 stats = jau::fs::file_stats(dirfd, path);
692 }
693 if( ( stats.exists() && !stats.is_file() && !stats.has_fd() ) || !stats.has_access() ) {
694 setstate_impl( iostate::failbit ); // Note: conforming with std::ofstream open (?)
695 DBG_PRINT("ByteOutStream_File::ctor: Error, an existing non[file, fd] or not accessible element in %s, %s", stats.to_string().c_str(), to_string().c_str());
696 } else {
697 // O_NONBLOCK, is useless on files and counter to this class logic
698 if( stats.has_fd() ) {
699 m_fd = ::dup( stats.fd() );
700 } else {
701 const int dst_flags = ( stats.exists() ? 0 : O_CREAT|O_EXCL ) | O_WRONLY|O_BINARY|O_NOCTTY;
702 m_fd = __posix_openat64(dirfd, stats.path().c_str(), dst_flags, jau::fs::posix_protection_bits( mode ));
703 }
704 if ( 0 > m_fd ) {
705 setstate_impl( iostate::failbit ); // Note: conforming with std::ifstream open
706 DBG_PRINT("ByteOutStream_File::ctor: Error while opening %s, %s", stats.to_string().c_str(), to_string().c_str());
707 }
708 if( stats.is_file() ) {
709 off64_t abs_pos = __posix_lseek64(m_fd, 0, SEEK_END);
710 if( 0 > abs_pos ) {
711 setstate_impl( iostate::failbit );
712 ERR_PRINT("Failed to position existing file to end %s, errno %d %s",
713 to_string().c_str(), errno, strerror(errno));
714 }
715 }
716 }
717}
718
719ByteOutStream_File::ByteOutStream_File(const std::string& path, const jau::fs::fmode_t mode) noexcept
720: ByteOutStream_File(AT_FDCWD, path, mode) {}
721
723 if( 0 <= m_fd ) {
724 ::close(m_fd);
725 m_fd = -1;
727 }
728}
729
730std::string ByteOutStream_File::to_string() const noexcept {
731 return "ByteOutStream_File[consumed "+jau::to_decstring(m_bytes_consumed)+
732 ", fd "+std::to_string(m_fd)+
733 ", iostate["+jau::io::to_string(rdstate())+
734 "], "+stats.to_string()+
735 "]";
736}
737
static void append_bitstr(std::string &out, T mask, T bit, const std::string &bitstr, bool &comma)
#define O_BINARY
Definition: byte_stream.cpp:51
#define IOSTATE_ENUM(X, M)
constexpr void copy_mem(void *out, const void *in, size_t n) noexcept
Definition: byte_stream.cpp:76
#define __posix_lseek64
Definition: byte_stream.cpp:67
#define __posix_openat64
Definition: byte_stream.cpp:66
#define APPEND_BITSTR(U, V, W, M)
Platform agnostic representation of POSIX ::lstat() and ::stat() for a given pathname.
Definition: file_util.hpp:406
std::string to_string() const noexcept
Returns a comprehensive string representation of this element.
Definition: file_util.cpp:859
bool available(size_t n) noexcept override
Check whether n bytes are available in the input stream.
bool is_open() const noexcept override
Checks if the stream has an associated file.
bool has_content_size() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
void set_eof(const async_io_result_t result) noexcept
Set end-of-data (EOS), i.e.
ByteInStream_Feed(std::string id_name, const jau::fraction_i64 &timeout) noexcept
Construct a ringbuffer backed externally provisioned byte input stream.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
iostate rdstate() const noexcept override
Returns the current state flags.
bool write(uint8_t in[], size_t length, const jau::fraction_i64 &timeout) noexcept
Write given bytes to the async ringbuffer using explicit given timeout.
size_t peek(void *out, size_t length, size_t peek_offset) noexcept override
Read from the source but do not modify the internal offset.
std::string to_string() const noexcept override
size_t read(void *out, size_t length) noexcept override
Read from the source.
File based byte input stream, including named file descriptor.
std::string to_string() const noexcept override
size_t peek(void *, size_t, size_t) noexcept override
Read from the source but do not modify the internal offset.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
size_t read(void *, size_t) noexcept override
Read from the source.
ByteInStream_File(const std::string &path) noexcept
Construct a stream based byte input stream from filesystem path.
bool available(size_t n) noexcept override
Return whether n bytes are available in the input stream, if has_content_size() or using an asynchron...
bool has_content_size() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
void clear_recording() noexcept
Clears the recording.
iostate rdstate() const noexcept override
Returns the current state flags.
void start_recording() noexcept
Starts the recording.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
size_t read(void *, size_t) noexcept override
Read from the source.
std::string to_string() const noexcept override
void stop_recording() noexcept
Stops the recording.
std::string to_string() const noexcept override
void close() noexcept override
Close the stream if supported by the underlying mechanism.
std::string to_string() const noexcept override
size_t read(void *out, size_t length) noexcept override
Read from the source.
ByteInStream_URL(std::string url, const jau::fraction_i64 &timeout) noexcept
Construct a ringbuffer backed Http byte input stream.
iostate rdstate() const noexcept override
Returns the current state flags.
bool available(size_t n) noexcept override
Check whether n bytes are available in the input stream.
bool has_content_size() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
size_t peek(void *out, size_t length, size_t peek_offset) noexcept override
Read from the source but do not modify the internal offset.
bool is_open() const noexcept override
Checks if the stream has an associated file.
Abstract byte input stream object.
virtual void close() noexcept=0
Close the stream if supported by the underlying mechanism.
virtual std::string id() const noexcept
return the id of this data source
File based byte output stream, including named file descriptor.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
ByteOutStream_File(const std::string &path, const jau::fs::fmode_t mode=jau::fs::fmode_t::def_file_prot) noexcept
Construct a stream based byte output stream from filesystem path, either an existing or new file.
size_t write(const void *, size_t) noexcept override
Write to the data sink.
std::string to_string() const noexcept override
virtual size_t write(const void *in, size_t length) noexcept=0
Write to the data sink.
constexpr iostate rdstate_impl() const noexcept
virtual iostate rdstate() const noexcept
Returns the current state flags.
constexpr void setstate_impl(iostate state) const noexcept
Size_type size() const noexcept
Returns the number of elements in this ring buffer.
void close(const bool zeromem=false) noexcept
Close this ringbuffer by releasing all elements available and resizing capacity to zero.
bool isEmpty() const noexcept
Returns true if this ring buffer is empty, otherwise false.
std::string toString() const noexcept
Returns a short string representation incl.
#define ERR_PRINT(...)
Use for unconditional error messages, prefix '[elapsed_time] Error @ FILE:LINE FUNC: '.
Definition: debug.hpp:109
#define DBG_PRINT(...)
Use for environment-variable environment::DEBUG conditional debug messages, prefix '[elapsed_time] De...
Definition: debug.hpp:52
const uint8_t * cast_char_ptr_to_uint8(const char *s) noexcept
Definition: byte_util.hpp:155
std::string to_string(const alphabet &v) noexcept
Definition: base_codec.hpp:97
fmode_t
Generic file type and POSIX protection mode bits as used in file_stats, touch(), mkdir() etc.
Definition: file_util.hpp:236
constexpr ::mode_t posix_protection_bits(const fmode_t mask) noexcept
Returns the POSIX protection bits: rwx_all | set_uid | set_gid | sticky, i.e.
Definition: file_util.hpp:354
iostate
Mimic std::ios_base::iostate for state functionality, see iostate_func.
Definition: byte_stream.hpp:53
std::unique_ptr< ByteInStream > to_ByteInStream(const std::string &path_or_uri, jau::fraction_i64 timeout=20_s) noexcept
Parses the given path_or_uri, if it matches a supported protocol, see jau::io::uri::protocol_supporte...
bool is_local_file_protocol(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches the local file protocol, i.e.
Definition: io_util.cpp:204
uint64_t read_url_stream(const std::string &url, secure_vector< uint8_t > &buffer, const StreamConsumerFunc &consumer_fn) noexcept
Synchronous URL stream reader using the given StreamConsumerFunc consumer_fn.
Definition: io_util.cpp:309
async_io_result_t
Asynchronous I/O operation result value.
Definition: io_util.hpp:68
const size_t BEST_URLSTREAM_RINGBUFFER_SIZE
Definition: byte_stream.cpp:73
std::string to_string(const iostate mask) noexcept
bool protocol_supported(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches a supported by libcurl network protocols otherwis...
Definition: io_util.cpp:194
@ goodbit
No error occurred nor has EOS being reached.
@ failbit
Input or output operation failed (formatting or extraction error).
@ eofbit
An input operation reached the end of its stream.
@ timeout
Input or output operation failed due to timeout.
@ NONE
Operation still in progress.
@ FAILED
Operation failed.
@ SUCCESS
Operation succeeded.
constexpr T min(const T x, const T y) noexcept
Returns the minimum of two integrals (w/ branching) in O(1)
Definition: base_math.hpp:177
constexpr T max(const T x, const T y) noexcept
Returns the maximum of two integrals (w/ branching) in O(1)
Definition: base_math.hpp:191
std::string to_decstring(const value_type &v, const char separator=',', const nsize_t width=0) noexcept
Produce a decimal string representation of an integral integer value.
__pack(...): Produces MSVC, clang and gcc compatible lead-in and -out macros.
Definition: backtrace.hpp:32
STL namespace.
CXX_ALWAYS_INLINE _Tp load() const noexcept