Gamp v0.0.8
Gamp: Graphics, Audio, Multimedia and Processing
Loading...
Searching...
No Matches
byte_stream.cpp
Go to the documentation of this file.
1/*
2 * Author: Sven Gothel <sgothel@jausoft.com>
3 * Copyright (c) 2021-2023 Gothel Software e.K.
4 *
5 * ByteStream, ByteInStream_SecMemory and ByteStream_istream are derived from Botan under same license:
6 * - Copyright (c) 1999-2007 Jack Lloyd
7 * - Copyright (c) 2005 Matthew Gregan
8 *
9 * Permission is hereby granted, free of charge, to any person obtaining
10 * a copy of this software and associated documentation files (the
11 * "Software"), to deal in the Software without restriction, including
12 * without limitation the rights to use, copy, modify, merge, publish,
13 * distribute, sublicense, and/or sell copies of the Software, and to
14 * permit persons to whom the Software is furnished to do so, subject to
15 * the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be
18 * included in all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
21 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
22 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
23 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
24 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
25 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
26 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
27 */
28
29// #include <botan_all.h>
30
31#include <cstddef>
32#include <cstring>
33#include <limits>
34#include <jau/cpuid.hpp>
35#include <jau/debug.hpp>
36
38#include <jau/io/io_util.hpp>
39#include <jau/io/bit_stream.hpp>
40
41extern "C" {
42 #include <unistd.h>
43 #include <sys/types.h>
44 #include <sys/stat.h>
45 #include <fcntl.h>
46}
47
48#ifdef USE_LIBCURL
49 #include <curl/curl.h>
50#endif
51
52#include <thread>
53#include <pthread.h>
54
55#ifndef O_BINARY
56#define O_BINARY 0
57#endif
58#ifndef O_NONBLOCK
59#define O_NONBLOCK 0
60#endif
61
62using namespace jau::io;
63using namespace jau::fractions_i64_literals;
64using namespace jau::int_literals;
65
66#if defined(__FreeBSD__)
67 typedef off_t off64_t;
68 #define __posix_openat64 ::openat
69 #define __posix_lseek64 ::lseek
70#else
71 #define __posix_openat64 ::openat64
72 #define __posix_lseek64 ::lseek64
73#endif
74
75#ifdef USE_LIBCURL
76 const size_t jau::io::BEST_URLSTREAM_RINGBUFFER_SIZE = 2_uz * (size_t)CURL_MAX_WRITE_SIZE;
77#else
78 const size_t jau::io::BEST_URLSTREAM_RINGBUFFER_SIZE = 2_uz * 16384_uz;
79#endif
80
81inline constexpr static void copy_mem(void* out, const void* in, size_t n) noexcept {
82 if(in != nullptr && out != nullptr && n > 0) {
83 std::memcpy(out, in, sizeof(uint8_t)*n);
84 }
85}
86
87size_t ByteStream::discardRead(size_t n) noexcept {
88 if( !good() || !canRead() ) {
89 return 0;
90 }
91 uint8_t buf[1024] = { 0 };
92 size_t discarded = 0;
93
94 while(n)
95 {
96 const size_t got = read(buf, std::min(n, sizeof(buf)));
97 if( 0 == got ) {
98 break;
99 }
100 discarded += got;
101 n -= got;
102 }
103 return discarded;
104}
105
109 if( fail() ) {
110 return ByteStream::npos;
111 } else if( newPos > m_source.size() || newPos > std::numeric_limits<size_t>::max() ) {
112 return ByteStream::npos;
113 }
116 m_offset = static_cast<size_t>(newPos);
117 if( m_mark > newPos ) {
118 m_mark = npos;
119 }
120 if( m_source.size() == m_offset ) {
122 }
123 return m_offset;
124}
125
126[[nodiscard]] size_t ByteStream_SecMemory::discard(size_t N) noexcept {
127 if( !good() || !canRead() ) {
128 return 0;
129 }
130 size_t n = std::min(N, m_source.size() - m_offset);
131 m_offset += n;
132 if( m_source.size() == m_offset ) {
134 }
135 return n;
136}
137
138[[nodiscard]] bool ByteStream_SecMemory::setMark(size_type) noexcept {
139 m_mark = m_offset;
140 return true;
141}
142
143[[nodiscard]] bool ByteStream_SecMemory::seekMark() noexcept {
144 if( npos == m_mark ) {
145 return false;
146 }
147 return m_mark == seek(m_mark);
148}
149
150size_t ByteStream_SecMemory::read(void* out, size_t length) noexcept {
151 if( 0 == length || !good() || !canRead() ) {
152 return 0;
153 }
154 const size_t got = std::min<size_t>(m_source.size() - m_offset, length);
155 copy_mem(out, m_source.data() + m_offset, got);
156 m_offset += got;
157 if( m_source.size() == m_offset ) {
159 }
160 return got;
161}
162
163bool ByteStream_SecMemory::available(size_t n) noexcept {
164 return canRead() && m_source.size() - m_offset >= n;
165}
166
167size_t ByteStream_SecMemory::peek(void* out, size_t length, size_type peek_offset) noexcept {
170 const size_t bytes_left = m_source.size() - m_offset;
171 if( 0 == length || !good() || !canRead() ||
172 ( peek_offset > std::numeric_limits<size_t>::max() ) ||
173 ( bytes_left < peek_offset + 1 /* min number of bytes to read */ ) ) {
174 return 0;
175 }
177 const size_t po_sz = static_cast<size_t>(peek_offset);
178 const size_t peek_len = std::min(bytes_left - po_sz, length);
179 copy_mem(out, &m_source[m_offset + po_sz], peek_len);
180 return peek_len;
181}
182
183[[nodiscard]] size_t ByteStream_SecMemory::write(const void* out, size_t length) noexcept {
184 if( 0 == length || fail() || !canWrite() ) {
185 return 0;
186 }
187 const size_t got = std::min<size_t>(m_source.size() - m_offset, length);
188 copy_mem(m_source.data() + m_offset, out, got);
189 m_offset += got;
190 return got;
191}
192
195 m_source(cast_char_ptr_to_uint8(in.data()),
196 cast_char_ptr_to_uint8(in.data()) + in.length()),
197 m_offset(0), m_mark(npos)
198{ }
199
201 m_source.clear();
202 m_offset = 0;
204}
205
206std::string ByteStream_SecMemory::toString() const noexcept {
207 return "ByteInStream_SecMemory[content size "+jau::to_decstring(m_source.size())+
208 ", consumed "+jau::to_decstring(m_offset)+
209 ", available "+jau::to_decstring(m_source.size()-m_offset)+
210 ", iomode["+jau::io::to_string(m_iomode)+
211 ", iostate["+jau::io::to_string(rdstate())+
212 "]]";
213}
214
216 if( fail() ||
217 !m_has_content_length ||
218 ( !canWrite() && newPos > m_content_size ) ||
219 newPos > std::numeric_limits<off64_t>::max() )
220 {
221 return ByteStream::npos;
222 }
224 if( newPos != m_offset ) {
225 off64_t abs_pos = __posix_lseek64(m_fd, static_cast<off64_t>(newPos), SEEK_SET);
226 if( 0 > abs_pos ) {
228 jau_ERR_PRINT("Failed to seek to position %" PRIu64 " of existing file %s", newPos, toString());
229 return ByteStream::npos;
230 }
231 m_offset = abs_pos;
232 if( m_mark > m_offset ) {
233 m_mark = npos;
234 }
235 if( m_content_size == m_offset ) {
237 }
238 }
239 return m_offset;
240}
241
242[[nodiscard]] size_t ByteStream_File::discard(size_t N) noexcept {
243 if( !canRead() ) { return 0; }
244 if( m_has_content_length ) {
245 if( !good() ) {
246 return 0;
247 }
248 const size_t n = std::min<size_t>(N, m_content_size - m_offset);
249 const size_t p0 = m_offset;
250 return seek(p0 + n) - p0;
251 } else {
252 return discardRead(N);
253 }
254}
255
256[[nodiscard]] bool ByteStream_File::setMark(size_type) noexcept {
257 m_mark = m_offset;
258 return true;
259}
260
261[[nodiscard]] bool ByteStream_File::seekMark() noexcept {
262 if( npos == m_mark ) {
263 return false;
264 }
265 return m_mark == seek(m_mark);
266}
267
268[[nodiscard]] size_t ByteStream_File::read(void* out, size_t length) noexcept {
269 if( 0 == length || !good() || !canRead() ) {
270 return 0;
271 }
272 uint8_t* out_u8 = static_cast<uint8_t*>(out);
273 size_t total = 0;
274 while( total < length ) {
275 ssize_t len;
276 while ( ( len = ::read(m_fd, out_u8+total, length-total) ) < 0 ) {
277 if ( errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR ) {
278 // cont temp unavail or interruption
279 // unlikely for regular files and we open w/o O_NONBLOCK
280 // - EAGAIN (file) and EWOULDBLOCK (socket) if blocking
281 // - EINTR (signal)
282 continue;
283 }
284 // Check errno == ETIMEDOUT ??
286 jau_DBG_PRINT("ByteInStream_File::read: Error occurred in %s, errno %d %s", toString(), errno, strerror(errno));
287 return 0;
288 }
289 total += static_cast<size_t>(len);
290 if( 0 == len || ( m_has_content_length && m_offset + total >= m_content_size ) ) {
291 addstate_impl( iostate_t::eofbit ); // Note: std::istream also sets iostate::failbit on eof, we don't.
292 break;
293 }
294 }
295 m_offset += total;
296 return total;
297}
298
299size_t ByteStream_File::peek(void* out, size_t length, size_type peek_offset) noexcept {
300 const size_type bytes_left = ( m_has_content_length ? m_content_size : std::numeric_limits<off64_t>::max() ) - m_offset;
301 if( 0 == length || !good() || !canRead() ||
302 ( peek_offset > std::numeric_limits<off64_t>::max() ) ||
303 ( bytes_left < peek_offset + 1 /* min number of bytes to read */ ) ) {
304 return 0;
305 }
306 size_t got = 0;
307
308 off64_t abs_pos = 0;
309 if( 0 < peek_offset ) {
310 abs_pos = __posix_lseek64(m_fd, static_cast<off64_t>(peek_offset), SEEK_CUR);
311 if( 0 > abs_pos ) {
313 jau_DBG_PRINT("ByteInStream_File::peek: Error occurred (offset1 %" PRIu64 ") in %s, errno %d %s",
314 peek_offset, toString(), errno, strerror(errno));
315 return 0;
316 }
317 }
318 if( abs_pos == static_cast<off64_t>(peek_offset) ) { // NOLINT(modernize-use-integer-sign-comparison): false positive
319 ssize_t len = 0;
320 while ( ( len = ::read(m_fd, out, length) ) < 0 ) {
321 if ( errno == EAGAIN || errno == EINTR ) {
322 // cont temp unavail or interruption
323 continue;
324 }
325 // Check errno == ETIMEDOUT ??
327 jau_DBG_PRINT("ByteInStream_File::peak: Error occurred (read) in %s, errno %d %s", toString(), errno, strerror(errno));
328 return 0;
329 }
330 got = len; // potentially zero bytes, i.e. eof
331 }
332 if( __posix_lseek64(m_fd, static_cast<off64_t>(m_offset), SEEK_SET) < 0 ) {
333 // even though we were able to fetch the desired data above, let's fail if position reset fails
335 jau_DBG_PRINT("ByteInStream_File::peek: Error occurred (offset2 %" PRIu64 ") in %s, errno %d %s",
336 peek_offset, toString(), errno, strerror(errno));
337 return 0;
338 }
339 return got;
340}
341
342bool ByteStream_File::available(size_t n) noexcept {
343 return isOpen() && good() && canRead() && ( !m_has_content_length || m_content_size - m_offset >= (size_type)n );
344}
345
346size_t ByteStream_File::write(const void* out, size_t length) noexcept {
347 if( 0 == length || fail() || !canWrite() ) {
348 return 0;
349 }
350 const uint8_t* out_u8 = static_cast<const uint8_t*>(out);
351 size_t total = 0;
352 while( total < length ) {
353 ssize_t len;
354 while ( ( len = ::write(m_fd, out_u8+total, length-total) ) < 0 ) {
355 if ( errno == EAGAIN || errno == EINTR ) {
356 // cont temp unavail or interruption
357 // unlikely for regular files and we open w/o O_NONBLOCK
358 // - EAGAIN (file) and EWOULDBLOCK (socket) if blocking
359 // - EINTR (signal)
360 continue;
361 }
362 // Check errno == ETIMEDOUT ??
364 jau_DBG_PRINT("ByteOutStream_File::write: Error occurred in %s, errno %d %s", toString(), errno, strerror(errno));
365 return 0;
366 }
367 total += static_cast<size_t>(len);
368 if( 0 == len ) {
370 break;
371 }
372 }
373 m_offset += total;
374 if( m_has_content_length && m_offset > m_content_size ) {
375 m_content_size = m_offset;
377 }
378 return total;
379}
380
381static bool _jau_file_size(const int fd, const jau::io::fs::file_stats& stats, const off64_t cur_pos, ByteStream::size_type& len) noexcept {
382 if( stats.has( jau::io::fs::file_stats::field_t::size ) ) {
383 len = stats.size();
384 return true;
385 }
386 if( !stats.is_file() ) {
387 return false; // possible pipe or stdin etc
388 }
389 // should have been covered by stats, but try traditionally harder
390 const off64_t end = __posix_lseek64(fd, 0, SEEK_END);
391 if( 0 > end ) {
392 return false;
393 }
394 const off64_t cur_pos2 = __posix_lseek64(fd, cur_pos, SEEK_SET);
395 if( cur_pos2 != cur_pos ) {
396 jau_DBG_PRINT("ByteInStream_File::file_size: Error rewinding to current position failed, orig-pos %" PRIi64 " -> %" PRIi64 ", errno %d %s.",
397 cur_pos, cur_pos2, errno, strerror(errno));
398 return false;
399 }
400 len = ByteStream::size_type(end) + 1_u64;
401 return true;
402}
403
406 m_stats(fd), m_fd(-1), m_has_content_length(false), m_content_size(0),
407 m_offset(0), m_mark(npos)
408{
409 if( !m_stats.exists() || !m_stats.has_access() ) {
410 addstate_impl( iostate_t::failbit ); // Note: conforming with std::ifstream open
411 jau_DBG_PRINT("ByteInStream_File::ctor: Error, not an existing or accessible file in %s, %s", m_stats.toString(), toString());
412 return;
413 }
414 m_fd = ::dup(fd);
415 if ( 0 > m_fd ) {
416 addstate_impl( iostate_t::failbit ); // Note: conforming with std::ifstream open
417 jau_DBG_PRINT("ByteInStream_File::ctor: Error occurred in %s, %s", m_stats.toString(), toString());
418 return;
419 }
420 const off64_t cur_pos = __posix_lseek64(m_fd, 0, SEEK_CUR);
421 if( 0 > cur_pos ) {
423 jau_ERR_PRINT("Failed to read position of existing file %s, errno %d %s", toString(), errno, strerror(errno));
424 return;
425 }
426 m_offset = cur_pos;
427 if( _jau_file_size(m_fd, m_stats, cur_pos, m_content_size) ) {
428 m_has_content_length = true;
429 }
430}
431
432ByteStream_File::ByteStream_File(const int dirfd, const std::string& path, iomode_t iomode, const jau::io::fs::fmode_t fmode, lb_endian_t byteOrder) noexcept
433: ByteStream(iomode, byteOrder),
434 m_stats(), m_fd(-1), m_has_content_length(false), m_content_size(0),
435 m_offset(0), m_mark(npos)
436{
438 // cut of leading `file://`
439 std::string path2 = path.substr(7);
440 m_stats = jau::io::fs::file_stats(dirfd, path2);
441 } else {
442 m_stats = jau::io::fs::file_stats(dirfd, path);
443 }
444 if( !canRead() && !canWrite() ) {
445 addstate_impl( iostate_t::failbit ); // Note: conforming with std::ifstream open
446 jau_DBG_PRINT("ByteStream_File::ctor: Error, iomode_t invalid: %s, %s", m_iomode, toString());
447 return;
448 }
449 if( ( m_stats.exists() && !m_stats.is_file() && !m_stats.has_fd() ) || !m_stats.has_access() ) {
450 addstate_impl( iostate_t::failbit ); // Note: conforming with std::ofstream open (?)
451 jau_DBG_PRINT("ByteStream_File::ctor: Error, an existing non[file, fd] or not accessible element in %s, %s", m_stats.toString(), toString());
452 return;
453 }
454 if( !canWrite() && !m_stats.exists() ) {
455 addstate_impl( iostate_t::failbit ); // Note: conforming with std::ifstream open
456 jau_DBG_PRINT("ByteStream_File::ctor: Error, can't open non-existing read-only file in %s, %s", m_stats.toString(), toString());
457 return;
458 }
459 bool truncated = false;
460 bool just_opened = false;
461 if( m_stats.has_fd() ) {
462 m_fd = ::dup( m_stats.fd() );
463 } else {
464 // O_NONBLOCK, is useless on files and counter to this class logic
465 int flags = O_BINARY|O_NOCTTY;
466 if( canRead() && !canWrite() ) {
467 flags |= O_RDONLY;
468 } else if( !canRead() && canWrite() ) {
469 flags |= O_WRONLY;
470 } else {
471 flags |= O_RDWR;
472 }
473 if( !m_stats.exists() ) {
474 flags |= O_CREAT;
475 } else if( canWrite() && is_set(m_iomode, iomode_t::trunc) ) {
476 flags |= O_TRUNC;
477 truncated = true;
478 }
479 m_fd = __posix_openat64(dirfd, m_stats.path().c_str(), flags, jau::io::fs::posix_protection_bits( fmode ));
480 just_opened = true;
481 }
482 if ( 0 > m_fd ) {
483 addstate_impl( iostate_t::failbit ); // Note: conforming with std::ifstream open
484 jau_DBG_PRINT("ByteInStream_File::ctor: Error while opening %s, %s", m_stats.toString(), toString());
485 return;
486 }
487 if( truncated ) {
488 // m_content_size = 0, m_offset = 0
489 return;
490 }
491 const off64_t cur_pos = just_opened || !m_stats.is_file() ? 0 : __posix_lseek64(m_fd, 0, SEEK_CUR);
492 if( 0 > cur_pos ) {
494 jau_ERR_PRINT("Failed to read position of existing file %s, errno %d %s", toString(), errno, strerror(errno));
495 return;
496 }
497 m_offset = cur_pos;
498 if( _jau_file_size(m_fd, m_stats, cur_pos, m_content_size) ) {
499 m_has_content_length = true;
500 }
501 if( m_stats.is_file() && is_set(m_iomode, iomode_t::atend) ) {
502 const off64_t end = __posix_lseek64(m_fd, 0, SEEK_END);
503 if( 0 > end ) {
505 jau_ERR_PRINT("Failed to position existing file to end %s, errno %d %s", toString(), errno, strerror(errno));
506 return;
507 }
508 m_offset = end;
509 }
510}
511
513: ByteStream_File(AT_FDCWD, path, mode, fmode, byteOrder) {}
514
515void ByteStream_File::close() noexcept {
516 if( 0 <= m_fd ) {
517 ::close(m_fd);
518 m_fd = -1;
520 }
521}
522
523void ByteStream_File::flush() noexcept {
524 if( 0 <= m_fd && canWrite() ) {
525 ::fdatasync(m_fd);
526 }
527}
528
529std::string ByteStream_File::toString() const noexcept {
530 return "ByteInStream_File[content_length "+( hasContentSize() ? jau::to_decstring(m_content_size) : "n/a" )+
531 ", consumed "+jau::to_decstring(m_offset)+
532 ", available "+jau::to_decstring(get_available())+
533 ", fd "+std::to_string(m_fd)+
534 ", iomode["+jau::io::to_string(m_iomode)+
535 ", iostate["+jau::io::to_string(rdstate())+
536 "], "+m_stats.toString()+
537 "]";
538}
539
540
543 m_url(std::move(url)), m_timeout(timeout), m_buffer(BEST_URLSTREAM_RINGBUFFER_SIZE),
544 m_stream_resp( read_url_stream_async(nullptr, m_url, /*httpPostReq=*/nullptr, &m_buffer, AsyncStreamConsumerFunc()) ),
545 m_offset(0), m_mark(npos), m_rewindbuf()
546{ }
547
548void ByteInStream_URL::close() noexcept {
549 jau_DBG_PRINT("ByteInStream_URL: close.0 %s, %s", id(), to_string_int());
550
551 if( m_stream_resp->processing() ) {
552 m_stream_resp->result = io_result_t::SUCCESS; // signal end of streaming
553 }
554
555 m_buffer.close( true /* zeromem */); // also unblocks all r/w ops
556 if( m_stream_resp->thread.joinable() ) {
557 jau_DBG_PRINT("ByteInStream_URL: close.1 %s, %s", id(), m_buffer.toString());
558 m_stream_resp->thread.join();
559 }
560 std::thread none;
561 m_stream_resp->thread.swap(none);
562 jau_DBG_PRINT("ByteInStream_URL: close.X %s, %s", id(), to_string_int());
563}
564
565bool ByteInStream_URL::available(size_t n) noexcept {
566 if( !good() || !canRead() || !m_stream_resp->processing() ) {
567 // url thread ended, only remaining bytes in buffer available left
568 return m_buffer.size() >= n;
569 }
570 m_stream_resp->header_resp.wait_until_completion(m_timeout);
571 if( m_stream_resp->has_content_length && m_stream_resp->content_length - m_offset < n ) {
572 return false;
573 }
574 // I/O still in progress, we have to poll until data is available or timeout
575 // set_eof() unblocks ringbuffer via set_end_of_input(true) permanently, hence blocking call on !m_has_content_length is OK.
576 bool timeout_occured;
577 const size_t avail = m_buffer.waitForElements(n, m_timeout, timeout_occured);
578 if( avail < n ) {
579 if( timeout_occured ) {
581 if( m_stream_resp->processing() ) {
582 m_stream_resp->result = io_result_t::FAILED;
583 }
584 m_buffer.interruptWriter();
585 }
586 return false;
587 } else {
588 return true;
589 }
590}
591
592bool ByteInStream_URL::isOpen() const noexcept {
593 // url thread has not ended or remaining bytes in buffer available left
594 return m_stream_resp->processing() || m_buffer.size() > 0;
595}
596
597bool ByteInStream_URL::hasContentSize() const noexcept {
598 m_stream_resp->header_resp.wait_until_completion(m_timeout);
599 return m_stream_resp->has_content_length;
600}
601
603 m_stream_resp->header_resp.wait_until_completion(m_timeout);
604 const size_type length = contentSize();
605 if( fail() ) {
606 return ByteStream::npos;
607 } else if( m_rewindbuf.covered(m_mark, newPos) ){
609 m_offset = newPos;
610 return m_offset;
611 } else if( !hasContentSize() ) {
612 return ByteStream::npos;
613 } else if( newPos > length ) {
614 return ByteStream::npos;
615 } else if(newPos >= m_offset) {
617 discardRead(newPos - m_offset);
618 return m_offset;
619 } else {
620 jau_DBG_PRINT("ByteInStream_URL::seek newPos %" PRIu64 "< position %" PRIu64 " not implemented", newPos, m_offset);
621 return ByteStream::npos;
622 }
623}
624
625[[nodiscard]] size_t ByteInStream_URL::discard(size_t N) noexcept {
626 return discardRead(N);
627}
628
629[[nodiscard]] bool ByteInStream_URL::setMark(size_type readLimit) noexcept {
630 if( m_rewindbuf.setMark(m_mark, m_offset, readLimit) ) {
631 m_mark = m_offset;
632 return true;
633 } else {
634 return false;
635 }
636}
637
638[[nodiscard]] bool ByteInStream_URL::seekMark() noexcept {
639 if( npos == m_mark ) {
640 return false;
641 }
642 return m_mark == seek(m_mark);
643}
644
645size_t ByteInStream_URL::read(void* out, size_t length) noexcept {
646 m_stream_resp->header_resp.wait_until_completion(m_timeout);
647 if( 0 == length || !good() || !canRead() ) {
648 return 0;
649 }
650 return m_rewindbuf.read(m_mark, m_offset, newData, out, length);
651}
652
653size_t ByteInStream_URL::peek(void* out, size_t length, size_type peek_offset) noexcept {
654 (void)out;
655 (void)length;
656 (void)peek_offset;
657 jau_DBG_PRINT("ByteInStream_URL::peek not implemented");
658 return 0;
659}
660
662 if ( ( !m_stream_resp->processing() && m_buffer.isEmpty() ) ||
663 ( m_stream_resp->has_content_length && m_offset >= m_stream_resp->content_length ) )
664 {
666 }
667 if( m_stream_resp->failed() ) {
669 }
670 return rdstate_impl();
671}
672
673std::string ByteInStream_URL::to_string_int() const noexcept {
674 return m_url+", Url[content_length "+( hasContentSize() ? jau::to_decstring(m_stream_resp->content_length.load()) : "n/a" )+
675 ", xfered "+jau::to_decstring(m_stream_resp->total_read.load())+
676 ", result "+std::to_string((int8_t)m_stream_resp->result.load())+
677 "], consumed "+jau::to_decstring(m_offset)+
678 ", available "+jau::to_decstring(get_available())+
679 ", iomode["+jau::io::to_string(m_iomode)+
680 ", iostate["+jau::io::to_string(rdstate())+
681 "], rewind[mark "+std::to_string(m_mark)+", "+m_rewindbuf.toString()+
682 "], "+m_buffer.toString();
683}
684std::string ByteInStream_URL::toString() const noexcept {
685 return "ByteInStream_URL["+to_string_int()+"]";
686}
687
688std::unique_ptr<ByteStream> jau::io::to_ByteInStream(const std::string& path_or_uri, jau::fraction_i64 timeout) noexcept {
689 if( !jau::io::uri_tk::is_local_file_protocol(path_or_uri) &&
691 {
692 std::unique_ptr<ByteStream> res = std::make_unique<ByteInStream_URL>(path_or_uri, timeout);
693 if( nullptr != res && !res->fail() ) {
694 return res;
695 }
696 }
697 std::unique_ptr<ByteStream> res = std::make_unique<ByteStream_File>(path_or_uri, iomode_t::read);
698 if( nullptr != res && !res->fail() ) {
699 return res;
700 }
701 return nullptr;
702}
703
706 m_id(std::move(id_name)), m_timeout(timeout), m_buffer(BEST_URLSTREAM_RINGBUFFER_SIZE),
707 m_has_content_length( false ), m_content_size( 0 ), m_total_xfered( 0 ), m_result( io::io_result_t::NONE ),
708 m_offset(0)
709{ }
710
712 jau_DBG_PRINT("ByteInStream_Feed: close.0 %s, %s", id(), toStringInt());
713
714 if( io_result_t::NONE == m_result ) {
715 m_result = io_result_t::SUCCESS; // signal end of streaming
716 }
717 m_buffer.close( true /* zeromem */); // also unblocks all r/w ops
718 jau_DBG_PRINT("ByteInStream_Feed: close.X %s, %s", id(), toStringInt());
719}
720
721bool ByteInStream_Feed::available(size_t n) noexcept {
722 if( !good() || !canRead() || io_result_t::NONE != m_result ) {
723 // feeder completed, only remaining bytes in buffer available left
724 return m_buffer.size() >= n;
725 }
726 if( m_has_content_length && m_content_size - m_offset < n ) {
727 return false;
728 }
729 // I/O still in progress, we have to poll until data is available or timeout
730 // set_eof() unblocks ringbuffer via set_end_of_input(true) permanently, hence blocking call on !m_has_content_length is OK.
731 bool timeout_occured;
732 const size_t avail = m_buffer.waitForElements(n, m_timeout, timeout_occured);
733 if( avail < n ) {
734 if( timeout_occured ) {
736 if( io_result_t::NONE == m_result ) {
737 m_result = io_result_t::FAILED;
738 }
739 m_buffer.interruptWriter();
740 }
741 return false;
742 } else {
743 return true;
744 }
745}
746
747bool ByteInStream_Feed::isOpen() const noexcept {
748 // feeder has not ended or remaining bytes in buffer available left
749 return io_result_t::NONE == m_result || m_buffer.size() > 0;
750}
751
753 const size_type length = m_content_size;
754 if( fail() ) {
755 return ByteStream::npos;
756 } else if( m_rewindbuf.covered(m_mark, newPos) ){
758 m_offset = newPos;
759 return m_offset;
760 } else if( !hasContentSize() ) {
761 return ByteStream::npos;
762 } else if( newPos > length ) {
763 return ByteStream::npos;
764 } else if(newPos >= m_offset) {
766 discardRead(newPos - m_offset);
767 return m_offset;
768 } else {
769 jau_DBG_PRINT("ByteInStream_Feed::seek newPos %" PRIu64 "< position %" PRIu64 " not implemented", newPos, m_offset);
770 return ByteStream::npos;
771 }
772}
773
774[[nodiscard]] size_t ByteInStream_Feed::discard(size_t N) noexcept {
775 return discardRead(N);
776}
777
778[[nodiscard]] bool ByteInStream_Feed::setMark(size_type readLimit) noexcept {
779 if( m_rewindbuf.setMark(m_mark, m_offset, readLimit) ) {
780 m_mark = m_offset;
781 return true;
782 } else {
783 return false;
784 }
785}
786
787[[nodiscard]] bool ByteInStream_Feed::seekMark() noexcept {
788 if( npos == m_mark ) {
789 return false;
790 }
791 return m_mark == seek(m_mark);
792}
793
794size_t ByteInStream_Feed::read(void* out, size_t length) noexcept {
795 if( 0 == length || !good() || !canRead() ) {
796 return 0;
797 }
798 return m_rewindbuf.read(m_mark, m_offset, newData, out, length);
799}
800
801size_t ByteInStream_Feed::peek(void* out, size_t length, size_type peek_offset) noexcept {
802 (void)out;
803 (void)length;
804 (void)peek_offset;
805 jau_DBG_PRINT("ByteInStream_Feed::peek not implemented");
806 return 0;
807}
808
810 if ( ( io_result_t::NONE != m_result && m_buffer.isEmpty() ) ||
811 ( m_has_content_length && m_offset >= m_content_size ) )
812 {
814 }
815 if( io_result_t::FAILED == m_result ) {
817 }
818 return rdstate_impl();
819}
820
821size_t ByteInStream_Feed::write(const void* in, size_t length, const jau::fraction_i64& timeout) noexcept {
822 if( 0 < length && canWrite() && good() && io_result_t::NONE == m_result ) { // feeder still running
823 bool timeout_occured;
824 const uint8_t *in8 = reinterpret_cast<const uint8_t*>(in);
825 if( m_buffer.putBlocking(in8, in8+length, timeout, timeout_occured) ) {
826 m_total_xfered.fetch_add(length);
827 return length;
828 } else {
829 if( timeout_occured ) {
831 m_buffer.interruptWriter();
832 } else {
834 }
835 if( io_result_t::NONE == m_result ) {
836 m_result = io_result_t::FAILED;
837 }
838 return 0;
839 }
840 } else {
841 return 0;
842 }
843}
844
845void ByteInStream_Feed::setEOF(const io_result_t result) noexcept {
846 m_result = result;
847 m_buffer.set_end_of_input(true); // still considering last data, also irqs blocking ringbuffer reader
848}
849
850std::string ByteInStream_Feed::toStringInt() const noexcept {
851 return m_id+", ext[content_length "+( hasContentSize() ? jau::to_decstring(m_content_size.load()) : "n/a" )+
852 ", xfered "+jau::to_decstring(m_total_xfered.load())+
853 ", result "+std::to_string((int8_t)m_result.load())+
854 "], consumed "+std::to_string(m_offset)+
855 ", available "+std::to_string(getAvailable())+
856 ", iomode["+jau::io::to_string(m_iomode)+
857 ", iostate["+jau::io::to_string(rdstate())+
858 "], "+m_buffer.toString();
859}
860
861std::string ByteInStream_Feed::toString() const noexcept {
862 return "ByteInStream_Feed["+toStringInt()+"]";
863}
864
867 m_parent.close();
868 jau_DBG_PRINT("ByteInStream_Recorder: close.X %s", id());
869}
870
872 m_buffer.resize(0);
873 m_rec_offset = m_offset;
874 m_is_recording = true;
875}
876
878 m_is_recording = false;
879}
880
882 m_is_recording = false;
883 m_buffer.clear();
884 m_rec_offset = 0;
885}
886
887size_t ByteStream_Recorder::read(void* out, size_t length) noexcept {
888 const size_t consumed_bytes = m_parent.read(out, length);
889 m_offset += consumed_bytes;
890 if( isRecording() && consumed_bytes > 0 ) {
891 uint8_t* out_u8 = static_cast<uint8_t*>(out);
892 m_buffer.insert(m_buffer.end(), out_u8, out_u8+consumed_bytes);
893 }
894 return consumed_bytes;
895}
896
897size_t ByteStream_Recorder::write(const void* out, size_t length) noexcept {
898 const size_t consumed_bytes = m_parent.write(out, length);
899 m_offset += consumed_bytes;
900 if( isRecording() && consumed_bytes > 0 ) {
901 const uint8_t* out_u8 = reinterpret_cast<const uint8_t*>(out);
902 m_buffer.insert(m_buffer.end(), out_u8, out_u8+consumed_bytes);
903 }
904 return consumed_bytes;
905}
906
907std::string ByteStream_Recorder::toString() const noexcept {
908 return "ByteInStream_Recorder[parent "+m_parent.id()+", recording[on "+std::to_string(m_is_recording)+
909 " offset "+jau::to_decstring(m_rec_offset)+
910 "], consumed "+jau::to_decstring(m_offset)+
911 ", iomode["+jau::io::to_string(m_iomode)+
912 ", iostate["+jau::io::to_string(rdstate())+"]]";
913}
914
915std::string jau::io::to_string(const ioaccess_t v) noexcept {
916 return v == ioaccess_t::read ? "read" : "write";
917}
String toString()
static constexpr void copy_mem(void *out, const void *in, size_t n) noexcept
static bool _jau_file_size(const int fd, const jau::io::fs::file_stats &stats, const off64_t cur_pos, ByteStream::size_type &len) noexcept
#define O_BINARY
#define __posix_lseek64
#define __posix_openat64
bool available(size_t n) noexcept override
Check whether n bytes are available in the input stream.
bool seekMark() noexcept override
Seeks stream position to markpos as set via setMark().
bool setMark(size_type readLimit) noexcept override
Set markpos to current position, allowing the stream to be seekMark().
std::string toString() const noexcept override
void setEOF(const io_result_t result) noexcept
Set end-of-data (EOS), i.e.
bool isOpen() const noexcept override
Checks if the stream has an associated file.
size_t peek(void *out, size_t length, size_type peek_offset) noexcept override
Not implemented, returns 0.
iostate_t rdstate() const noexcept override
Returns the current state flags.
ByteInStream_Feed(std::string id_name, const jau::fraction_i64 &timeout, lb_endian_t byteOrder=lb_endian_t::little) noexcept
Construct a ringbuffer backed externally provisioned byte input stream.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
size_t write(const void *in, size_t length, const jau::fraction_i64 &timeout) noexcept
Write given bytes to the async ringbuffer using explicit given timeout.
bool hasContentSize() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
size_t read(void *out, size_t length) noexcept override
Read from the source.
size_type seek(size_type newPos) noexcept override
newPos < position() limited to markpos, see setMark().
size_t discard(size_t N) noexcept override
Implemented by skipping input stream via read.
std::string toString() const noexcept override
bool isOpen() const noexcept override
Checks if the stream has an associated file.
ByteInStream_URL(std::string url, const jau::fraction_i64 &timeout, lb_endian_t byteOrder=lb_endian_t::little) noexcept
Construct a ringbuffer backed Http byte input stream.
iostate_t rdstate() const noexcept override
Returns the current state flags.
size_t read(void *out, size_t length) noexcept override
Read from the source.
bool setMark(size_type readLimit) noexcept override
Set markpos to current position, allowing the stream to be seekMark().
size_t discard(size_t N) noexcept override
Implemented by skipping input stream via read.
size_t peek(void *out, size_t length, size_type peek_offset) noexcept override
Read from the source but do not modify the internal offset.
bool hasContentSize() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
bool available(size_t n) noexcept override
Check whether n bytes are available in the input stream.
bool seekMark() noexcept override
Seeks stream position to markpos as set via setMark().
size_type seek(size_type newPos) noexcept override
newPos < position() limited to markpos, see setMark().
void close() noexcept override
Close the stream if supported by the underlying mechanism.
size_type contentSize() const noexcept override
Returns the content_size if known.
size_type seek(size_type newPos) noexcept override
Sets position indicator for output-streams or input-streams with known length, similar to e....
size_t peek(void *, size_t, size_type) noexcept override
Read from the source but do not modify the internal offset.
bool setMark(size_type readLimit) noexcept override
Set markpos to current position, allowing the stream to be seekMark().
size_t discard(size_t N) noexcept override
Discard the next N bytes of the data.
bool hasContentSize() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
void flush() noexcept override
Synchronizes all output operations, or do nothing.
bool isOpen() const noexcept override
Checks if the stream has an associated file.
int fd() const noexcept
Returns the file descriptor if is_open(), otherwise -1 for no file descriptor.
size_t write(const void *, size_t) noexcept override
Write to the data sink.
std::string toString() const noexcept override
ByteStream_File(const std::string &path, iomode_t iomode=iomode_t::rw, const jau::io::fs::fmode_t fmode=fs::fmode_t::def_file_prot, lb_endian_t byteOrder=lb_endian_t::little) noexcept
Construct a stream based byte stream from filesystem path, either an existing or new file.
bool available(size_t n) noexcept override
Return whether n bytes are available in the input stream, if has_content_size() or using an asynchron...
size_t read(void *, size_t) noexcept override
Read from the source.
bool seekMark() noexcept override
Seeks stream position to markpos as set via setMark().
std::string toString() const noexcept override
void close() noexcept override
Close the stream if supported by the underlying mechanism.
void startRecording() noexcept
Starts the recording.
size_t write(const void *, size_t) noexcept override
Write to the data sink.
void stopRecording() noexcept
Stops the recording.
size_t read(void *, size_t) noexcept override
Read from the source.
iostate_t rdstate() const noexcept override
Returns the current state flags.
void clearRecording() noexcept
Clears the recording.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
std::string toString() const noexcept override
size_t discard(size_t N) noexcept override
Discard the next N bytes of the data.
size_t peek(void *, size_t, size_type) noexcept override
Read from the source but do not modify the internal offset.
bool setMark(size_type readLimit) noexcept override
Set markpos to current position, allowing the stream to be seekMark().
size_type seek(size_type newPos) noexcept override
Sets position indicator for output-streams or input-streams with known length, similar to e....
bool seekMark() noexcept override
Seeks stream position to markpos as set via setMark().
bool available(size_t n) noexcept override
Return whether n bytes are available in the input stream, if has_content_size() or using an asynchron...
size_t read(void *, size_t) noexcept override
Read from the source.
ByteStream_SecMemory(const std::string &in, lb_endian_t byteOrder=lb_endian_t::little)
Construct a secure memory source that reads from a string, iomode_t::read.
size_t write(const void *, size_t) noexcept override
Write to the data sink.
constexpr bool canWrite() const noexcept
Returns true in case stream has iomode::write capabilities.
constexpr bool canRead() const noexcept
Returns true in case stream has iomode::read capabilities.
size_t discardRead(size_t n) noexcept
Fallback slow discard implementation usind read() in case of unknown stream size.
constexpr lb_endian_t byteOrder() const noexcept
Returns endian byte-order of stream storage.
uint64_t size_type
uint64_t size data type, bit position and count
ByteStream(iomode_t mode, lb_endian_t byteOrder=lb_endian_t::little) noexcept
constexpr iomode_t mode() const noexcept
static constexpr size_type npos
Invalid position constant, denoting unset mark() or invalid position.
bool fail() const noexcept
Checks if an error has occurred.
virtual iostate_t rdstate() const noexcept
Returns the current state flags.
void clearStateFlags(const iostate_t clr) noexcept
Clears given state flags from current value.
bool good() const noexcept
Checks if no error nor eof() has occurred i.e.
bool timeout() const noexcept
Checks if a timeout (non-recoverable) has occurred.
constexpr iostate_t rdstate_impl() const noexcept
constexpr void addstate_impl(iostate_t state) const noexcept
Platform agnostic representation of POSIX ::lstat() and ::stat() for a given pathname.
#define jau_DBG_PRINT(fmt,...)
Use for environment-variable environment::DEBUG conditional debug messages, prefix '[elapsed_time] De...
Definition debug.hpp:102
#define jau_ERR_PRINT(...)
Use for unconditional error messages, prefix '[elapsed_time] Error @ FILE:LINE FUNC: '.
Definition debug.hpp:152
lb_endian_t
Simplified reduced endian type only covering little- and big-endian.
const uint8_t * cast_char_ptr_to_uint8(const char *s) noexcept
#define PRAGMA_DISABLE_WARNING_TYPE_RANGE_LIMIT
constexpr bool is_set(const E mask, const E bits) noexcept
#define PRAGMA_DISABLE_WARNING_PUSH
#define PRAGMA_DISABLE_WARNING_POP
fmode_t
Generic file type and POSIX protection mode bits as used in file_stats, touch(), mkdir() etc.
constexpr ::mode_t posix_protection_bits(const fmode_t mask) noexcept
Returns the POSIX protection bits: rwx_all | set_uid | set_gid | sticky, i.e.
fraction< int64_t > fraction_i64
fraction using int64_t as integral type
AsyncStreamResponseRef read_url_stream_async(net_tk_handle handle, const std::string &url, http::PostRequestPtr httpPostReq, ByteRingbuffer *buffer, const AsyncStreamConsumerFunc &consumer_fn) noexcept
Asynchronous URL stream reader using the given AsyncStreamConsumerFunc consumer_fn.
Definition io_util.cpp:1121
bool is_local_file_protocol(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches the local file protocol, i.e.
Definition io_util.cpp:206
iomode_t
Stream I/O mode, e.g.
std::string to_string(const ioaccess_t v) noexcept
Return std::string representation of the given ioaccess_t.
io_result_t
I/O operation result value.
Definition io_util.hpp:55
ioaccess_t
I/O read or write access.
jau::function< bool(AsyncStreamResponse &, const uint8_t *, size_t, bool)> AsyncStreamConsumerFunc
Asynchronous stream consumer function.
Definition io_util.hpp:356
std::string toString(io_result_t v) noexcept
Definition io_util.hpp:67
std::unique_ptr< ByteStream > to_ByteInStream(const std::string &path_or_uri, jau::fraction_i64 timeout=20_s) noexcept
Parses the given path_or_uri, if it matches a supported protocol, see jau::io::uri::protocol_supporte...
const size_t BEST_URLSTREAM_RINGBUFFER_SIZE
bool protocol_supported(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches a supported by libcurl network protocols otherwis...
Definition io_util.cpp:196
@ rw
Read and write capabilities, i.e.
@ atend
Seek to end of (file) stream when opened.
@ trunc
Truncate existing (file) stream when opened with write.
@ read
Read capabilities.
@ NONE
Operation still in progress.
Definition io_util.hpp:60
@ FAILED
Operation failed.
Definition io_util.hpp:57
@ SUCCESS
Operation succeeded.
Definition io_util.hpp:63
@ read
Read intent.
@ none
No error occurred nor has EOS being reached.
@ failbit
Input or output operation failed (formatting or extraction error).
@ eofbit
An input operation reached the end of its stream (EOS).
@ timeout
Input or output operation failed due to timeout.
std::string to_decstring(const value_type &v, const char separator='\'', const nsize_t min_width=0) noexcept
Produces a decimal integer string representation of an integral integer value with given radix.
__pack(...): Produces MSVC, clang and gcc compatible lead-in and -out macros.
Definition backtrace.hpp:32
STL namespace.
relaxed_atomic_uint64 content_length
content_length tracking the content_length
Definition io_util.hpp:338
CXX_ALWAYS_INLINE _Tp load() const noexcept