jaulib v1.4.0-2-g788cf73
Jau Support Library (C++, Java, ..)
Loading...
Searching...
No Matches
byte_stream.cpp
Go to the documentation of this file.
1/*
2 * Author: Sven Gothel <sgothel@jausoft.com>
3 * Copyright (c) 2021-2023 Gothel Software e.K.
4 *
5 * ByteStream, ByteInStream_SecMemory and ByteStream_istream are derived from Botan under same license:
6 * - Copyright (c) 1999-2007 Jack Lloyd
7 * - Copyright (c) 2005 Matthew Gregan
8 *
9 * Permission is hereby granted, free of charge, to any person obtaining
10 * a copy of this software and associated documentation files (the
11 * "Software"), to deal in the Software without restriction, including
12 * without limitation the rights to use, copy, modify, merge, publish,
13 * distribute, sublicense, and/or sell copies of the Software, and to
14 * permit persons to whom the Software is furnished to do so, subject to
15 * the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be
18 * included in all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
21 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
22 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
23 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
24 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
25 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
26 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
27 */
28
29// #include <botan_all.h>
30
31#include <cstddef>
32#include <cstring>
33#include <limits>
34#include <jau/cpuid.hpp>
35#include <jau/debug.hpp>
36
38#include <jau/io/io_util.hpp>
39#include <jau/io/bit_stream.hpp>
40
41extern "C" {
42 #include <unistd.h>
43 #include <sys/types.h>
44 #include <sys/stat.h>
45 #include <fcntl.h>
46}
47
48#ifdef USE_LIBCURL
49 #include <curl/curl.h>
50#endif
51
52#include <thread>
53#include <pthread.h>
54
55#ifndef O_BINARY
56#define O_BINARY 0
57#endif
58#ifndef O_NONBLOCK
59#define O_NONBLOCK 0
60#endif
61
62using namespace jau::io;
63using namespace jau::fractions_i64_literals;
64using namespace jau::int_literals;
65
66#if defined(__FreeBSD__)
67 typedef off_t off64_t;
68 #define __posix_openat64 ::openat
69 #define __posix_lseek64 ::lseek
70#else
71 #define __posix_openat64 ::openat64
72 #define __posix_lseek64 ::lseek64
73#endif
74
75#ifdef USE_LIBCURL
76 const size_t jau::io::BEST_URLSTREAM_RINGBUFFER_SIZE = 2_uz * (size_t)CURL_MAX_WRITE_SIZE;
77#else
78 const size_t jau::io::BEST_URLSTREAM_RINGBUFFER_SIZE = 2_uz * 16384_uz;
79#endif
80
81inline constexpr static void copy_mem(void* out, const void* in, size_t n) noexcept {
82 if(in != nullptr && out != nullptr && n > 0) {
83 std::memcpy(out, in, sizeof(uint8_t)*n);
84 }
85}
86
87size_t ByteStream::discardRead(size_t n) noexcept {
88 if( !good() || !canRead() ) {
89 return 0;
90 }
91 uint8_t buf[1024] = { 0 };
92 size_t discarded = 0;
93
94 while(n)
95 {
96 const size_t got = read(buf, std::min(n, sizeof(buf)));
97 if( 0 == got ) {
98 break;
99 }
100 discarded += got;
101 n -= got;
102 }
103 return discarded;
104}
105
109 if( fail() ) {
110 return ByteStream::npos;
111 } else if( newPos > m_source.size() || newPos > std::numeric_limits<size_t>::max() ) {
112 return ByteStream::npos;
113 }
116 m_offset = static_cast<size_t>(newPos);
117 if( m_mark > newPos ) {
118 m_mark = npos;
119 }
120 if( m_source.size() == m_offset ) {
122 }
123 return m_offset;
124}
125
126[[nodiscard]] size_t ByteStream_SecMemory::discard(size_t N) noexcept {
127 if( !good() || !canRead() ) {
128 return 0;
129 }
130 size_t n = std::min(N, m_source.size() - m_offset);
131 m_offset += n;
132 if( m_source.size() == m_offset ) {
134 }
135 return n;
136}
137
138[[nodiscard]] bool ByteStream_SecMemory::setMark(size_type) noexcept {
139 m_mark = m_offset;
140 return true;
141}
142
143[[nodiscard]] bool ByteStream_SecMemory::seekMark() noexcept {
144 if( npos == m_mark ) {
145 return false;
146 }
147 return m_mark == seek(m_mark);
148}
149
150size_t ByteStream_SecMemory::read(void* out, size_t length) noexcept {
151 if( 0 == length || !good() || !canRead() ) {
152 return 0;
153 }
154 const size_t got = std::min<size_t>(m_source.size() - m_offset, length);
155 copy_mem(out, m_source.data() + m_offset, got);
156 m_offset += got;
157 if( m_source.size() == m_offset ) {
159 }
160 return got;
161}
162
163bool ByteStream_SecMemory::available(size_t n) noexcept {
164 return canRead() && m_source.size() - m_offset >= n;
165}
166
167size_t ByteStream_SecMemory::peek(void* out, size_t length, size_type peek_offset) noexcept {
170 const size_t bytes_left = m_source.size() - m_offset;
171 if( 0 == length || !good() || !canRead() ||
172 ( peek_offset > std::numeric_limits<size_t>::max() ) ||
173 ( bytes_left < peek_offset + 1 /* min number of bytes to read */ ) ) {
174 return 0;
175 }
177 const size_t po_sz = static_cast<size_t>(peek_offset);
178 const size_t peek_len = std::min(bytes_left - po_sz, length);
179 copy_mem(out, &m_source[m_offset + po_sz], peek_len);
180 return peek_len;
181}
182
183[[nodiscard]] size_t ByteStream_SecMemory::write(const void* out, size_t length) noexcept {
184 if( 0 == length || fail() || !canWrite() ) {
185 return 0;
186 }
187 const size_t got = std::min<size_t>(m_source.size() - m_offset, length);
188 copy_mem(m_source.data() + m_offset, out, got);
189 m_offset += got;
190 return got;
191}
192
195 m_source(cast_char_ptr_to_uint8(in.data()),
196 cast_char_ptr_to_uint8(in.data()) + in.length()),
197 m_offset(0), m_mark(npos)
198{ }
199
201 m_source.clear();
202 m_offset = 0;
204}
205
206std::string ByteStream_SecMemory::toString() const noexcept {
207 return "ByteInStream_SecMemory[content size "+jau::to_decstring(m_source.size())+
208 ", consumed "+jau::to_decstring(m_offset)+
209 ", available "+jau::to_decstring(m_source.size()-m_offset)+
210 ", iomode["+jau::io::to_string(m_iomode)+
211 ", iostate["+jau::io::to_string(rdstate())+
212 "]]";
213}
214
216 if( fail() ||
217 !m_has_content_length ||
218 ( !canWrite() && newPos > m_content_size ) ||
219 newPos > std::numeric_limits<off64_t>::max() )
220 {
221 return ByteStream::npos;
222 }
224 if( newPos != m_offset ) {
225 off64_t abs_pos = __posix_lseek64(m_fd, static_cast<off64_t>(newPos), SEEK_SET);
226 if( 0 > abs_pos ) {
228 ERR_PRINT("Failed to seek to position %" PRIu64 " of existing file %s, errno %d %s",
229 newPos, toString().c_str(), errno, strerror(errno));
230 return ByteStream::npos;
231 }
232 m_offset = abs_pos;
233 if( m_mark > m_offset ) {
234 m_mark = npos;
235 }
236 if( m_content_size == m_offset ) {
238 }
239 }
240 return m_offset;
241}
242
243[[nodiscard]] size_t ByteStream_File::discard(size_t N) noexcept {
244 if( !canRead() ) { return 0; }
245 if( m_has_content_length ) {
246 if( !good() ) {
247 return 0;
248 }
249 const size_t n = std::min(N, m_content_size - m_offset);
250 const size_t p0 = m_offset;
251 return seek(p0 + n) - p0;
252 } else {
253 return discardRead(N);
254 }
255}
256
257[[nodiscard]] bool ByteStream_File::setMark(size_type) noexcept {
258 m_mark = m_offset;
259 return true;
260}
261
262[[nodiscard]] bool ByteStream_File::seekMark() noexcept {
263 if( npos == m_mark ) {
264 return false;
265 }
266 return m_mark == seek(m_mark);
267}
268
269[[nodiscard]] size_t ByteStream_File::read(void* out, size_t length) noexcept {
270 if( 0 == length || !good() || !canRead() ) {
271 return 0;
272 }
273 uint8_t* out_u8 = static_cast<uint8_t*>(out);
274 size_t total = 0;
275 while( total < length ) {
276 ssize_t len;
277 while ( ( len = ::read(m_fd, out_u8+total, length-total) ) < 0 ) {
278 if ( errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR ) {
279 // cont temp unavail or interruption
280 // unlikely for regular files and we open w/o O_NONBLOCK
281 // - EAGAIN (file) and EWOULDBLOCK (socket) if blocking
282 // - EINTR (signal)
283 continue;
284 }
285 // Check errno == ETIMEDOUT ??
287 DBG_PRINT("ByteInStream_File::read: Error occurred in %s, errno %d %s", toString().c_str(), errno, strerror(errno));
288 return 0;
289 }
290 total += static_cast<size_t>(len);
291 if( 0 == len || ( m_has_content_length && m_offset + total >= m_content_size ) ) {
292 addstate_impl( iostate_t::eofbit ); // Note: std::istream also sets iostate::failbit on eof, we don't.
293 break;
294 }
295 }
296 m_offset += total;
297 return total;
298}
299
300size_t ByteStream_File::peek(void* out, size_t length, size_type peek_offset) noexcept {
301 const size_type bytes_left = ( m_has_content_length ? m_content_size : std::numeric_limits<off64_t>::max() ) - m_offset;
302 if( 0 == length || !good() || !canRead() ||
303 ( peek_offset > std::numeric_limits<off64_t>::max() ) ||
304 ( bytes_left < peek_offset + 1 /* min number of bytes to read */ ) ) {
305 return 0;
306 }
307 size_t got = 0;
308
309 off64_t abs_pos = 0;
310 if( 0 < peek_offset ) {
311 abs_pos = __posix_lseek64(m_fd, static_cast<off64_t>(peek_offset), SEEK_CUR);
312 if( 0 > abs_pos ) {
314 DBG_PRINT("ByteInStream_File::peek: Error occurred (offset1 %zd) in %s, errno %d %s",
315 peek_offset, toString().c_str(), errno, strerror(errno));
316 return 0;
317 }
318 }
319 if( abs_pos == static_cast<off64_t>(peek_offset) ) {
320 ssize_t len = 0;
321 while ( ( len = ::read(m_fd, out, length) ) < 0 ) {
322 if ( errno == EAGAIN || errno == EINTR ) {
323 // cont temp unavail or interruption
324 continue;
325 }
326 // Check errno == ETIMEDOUT ??
328 DBG_PRINT("ByteInStream_File::peak: Error occurred (read) in %s, errno %d %s", toString().c_str(), errno, strerror(errno));
329 return 0;
330 }
331 got = len; // potentially zero bytes, i.e. eof
332 }
333 if( __posix_lseek64(m_fd, static_cast<off64_t>(m_offset), SEEK_SET) < 0 ) {
334 // even though we were able to fetch the desired data above, let's fail if position reset fails
336 DBG_PRINT("ByteInStream_File::peek: Error occurred (offset2 %zd) in %s, errno %d %s",
337 peek_offset, toString().c_str(), errno, strerror(errno));
338 return 0;
339 }
340 return got;
341}
342
343bool ByteStream_File::available(size_t n) noexcept {
344 return isOpen() && good() && canRead() && ( !m_has_content_length || m_content_size - m_offset >= (size_type)n );
345}
346
347size_t ByteStream_File::write(const void* out, size_t length) noexcept {
348 if( 0 == length || fail() || !canWrite() ) {
349 return 0;
350 }
351 const uint8_t* out_u8 = static_cast<const uint8_t*>(out);
352 size_t total = 0;
353 while( total < length ) {
354 ssize_t len;
355 while ( ( len = ::write(m_fd, out_u8+total, length-total) ) < 0 ) {
356 if ( errno == EAGAIN || errno == EINTR ) {
357 // cont temp unavail or interruption
358 // unlikely for regular files and we open w/o O_NONBLOCK
359 // - EAGAIN (file) and EWOULDBLOCK (socket) if blocking
360 // - EINTR (signal)
361 continue;
362 }
363 // Check errno == ETIMEDOUT ??
365 DBG_PRINT("ByteOutStream_File::write: Error occurred in %s, errno %d %s", toString().c_str(), errno, strerror(errno));
366 return 0;
367 }
368 total += static_cast<size_t>(len);
369 if( 0 == len ) {
371 break;
372 }
373 }
374 m_offset += total;
375 if( m_has_content_length && m_offset > m_content_size ) {
376 m_content_size = m_offset;
378 }
379 return total;
380}
381
382static bool _jau_file_size(const int fd, const jau::io::fs::file_stats& stats, const off64_t cur_pos, ByteStream::size_type& len) noexcept {
383 if( stats.has( jau::io::fs::file_stats::field_t::size ) ) {
384 len = stats.size();
385 return true;
386 }
387 if( !stats.is_file() ) {
388 return false; // possible pipe or stdin etc
389 }
390 // should have been covered by stats, but try traditionally harder
391 const off64_t end = __posix_lseek64(fd, 0, SEEK_END);
392 if( 0 > end ) {
393 return false;
394 }
395 const off64_t cur_pos2 = __posix_lseek64(fd, cur_pos, SEEK_SET);
396 if( cur_pos2 != cur_pos ) {
397 DBG_PRINT("ByteInStream_File::file_size: Error rewinding to current position failed, orig-pos %" PRIi64 " -> %" PRIi64 ", errno %d %s.",
398 cur_pos, cur_pos2,
399 errno, strerror(errno));
400 return false;
401 }
402 len = ByteStream::size_type(end) + 1_u64;
403 return true;
404}
405
408 m_stats(fd), m_fd(-1), m_has_content_length(false), m_content_size(0),
409 m_offset(0), m_mark(npos)
410{
411 if( !m_stats.exists() || !m_stats.has_access() ) {
412 addstate_impl( iostate_t::failbit ); // Note: conforming with std::ifstream open
413 DBG_PRINT("ByteInStream_File::ctor: Error, not an existing or accessible file in %s, %s", m_stats.toString().c_str(), toString().c_str());
414 return;
415 }
416 m_fd = ::dup(fd);
417 if ( 0 > m_fd ) {
418 addstate_impl( iostate_t::failbit ); // Note: conforming with std::ifstream open
419 DBG_PRINT("ByteInStream_File::ctor: Error occurred in %s, %s", m_stats.toString().c_str(), toString().c_str());
420 return;
421 }
422 const off64_t cur_pos = __posix_lseek64(m_fd, 0, SEEK_CUR);
423 if( 0 > cur_pos ) {
425 ERR_PRINT("Failed to read position of existing file %s, errno %d %s",
426 toString().c_str(), errno, strerror(errno));
427 return;
428 }
429 m_offset = cur_pos;
430 if( _jau_file_size(m_fd, m_stats, cur_pos, m_content_size) ) {
431 m_has_content_length = true;
432 }
433}
434
435ByteStream_File::ByteStream_File(const int dirfd, const std::string& path, iomode_t iomode, const jau::io::fs::fmode_t fmode, lb_endian_t byteOrder) noexcept
436: ByteStream(iomode, byteOrder),
437 m_stats(), m_fd(-1), m_has_content_length(false), m_content_size(0),
438 m_offset(0), m_mark(npos)
439{
441 // cut of leading `file://`
442 std::string path2 = path.substr(7);
443 m_stats = jau::io::fs::file_stats(dirfd, path2);
444 } else {
445 m_stats = jau::io::fs::file_stats(dirfd, path);
446 }
447 if( !canRead() && !canWrite() ) {
448 addstate_impl( iostate_t::failbit ); // Note: conforming with std::ifstream open
449 DBG_PRINT("ByteStream_File::ctor: Error, iomode_t invalid: %s, %s", to_string(m_iomode).c_str(), toString().c_str());
450 return;
451 }
452 if( ( m_stats.exists() && !m_stats.is_file() && !m_stats.has_fd() ) || !m_stats.has_access() ) {
453 addstate_impl( iostate_t::failbit ); // Note: conforming with std::ofstream open (?)
454 DBG_PRINT("ByteStream_File::ctor: Error, an existing non[file, fd] or not accessible element in %s, %s", m_stats.toString().c_str(), toString().c_str());
455 return;
456 }
457 if( !canWrite() && !m_stats.exists() ) {
458 addstate_impl( iostate_t::failbit ); // Note: conforming with std::ifstream open
459 DBG_PRINT("ByteStream_File::ctor: Error, can't open non-existing read-only file in %s, %s", m_stats.toString().c_str(), toString().c_str());
460 return;
461 }
462 bool truncated = false;
463 bool just_opened = false;
464 if( m_stats.has_fd() ) {
465 m_fd = ::dup( m_stats.fd() );
466 } else {
467 // O_NONBLOCK, is useless on files and counter to this class logic
468 int flags = O_BINARY|O_NOCTTY;
469 if( canRead() && !canWrite() ) {
470 flags |= O_RDONLY;
471 } else if( !canRead() && canWrite() ) {
472 flags |= O_WRONLY;
473 } else {
474 flags |= O_RDWR;
475 }
476 if( !m_stats.exists() ) {
477 flags |= O_CREAT;
478 } else if( canWrite() && is_set(m_iomode, iomode_t::trunc) ) {
479 flags |= O_TRUNC;
480 truncated = true;
481 }
482 m_fd = __posix_openat64(dirfd, m_stats.path().c_str(), flags, jau::io::fs::posix_protection_bits( fmode ));
483 just_opened = true;
484 }
485 if ( 0 > m_fd ) {
486 addstate_impl( iostate_t::failbit ); // Note: conforming with std::ifstream open
487 DBG_PRINT("ByteInStream_File::ctor: Error while opening %s, %s", m_stats.toString().c_str(), toString().c_str());
488 return;
489 }
490 if( truncated ) {
491 // m_content_size = 0, m_offset = 0
492 return;
493 }
494 const off64_t cur_pos = just_opened || !m_stats.is_file() ? 0 : __posix_lseek64(m_fd, 0, SEEK_CUR);
495 if( 0 > cur_pos ) {
497 ERR_PRINT("Failed to read position of existing file %s, errno %d %s",
498 toString().c_str(), errno, strerror(errno));
499 return;
500 }
501 m_offset = cur_pos;
502 if( _jau_file_size(m_fd, m_stats, cur_pos, m_content_size) ) {
503 m_has_content_length = true;
504 }
505 if( m_stats.is_file() && is_set(m_iomode, iomode_t::atend) ) {
506 const off64_t end = __posix_lseek64(m_fd, 0, SEEK_END);
507 if( 0 > end ) {
509 ERR_PRINT("Failed to position existing file to end %s, errno %d %s",
510 toString().c_str(), errno, strerror(errno));
511 return;
512 }
513 m_offset = end;
514 }
515}
516
518: ByteStream_File(AT_FDCWD, path, mode, fmode, byteOrder) {}
519
520void ByteStream_File::close() noexcept {
521 if( 0 <= m_fd ) {
522 ::close(m_fd);
523 m_fd = -1;
525 }
526}
527
528void ByteStream_File::flush() noexcept {
529 if( 0 <= m_fd && canWrite() ) {
530 ::fdatasync(m_fd);
531 }
532}
533
534std::string ByteStream_File::toString() const noexcept {
535 return "ByteInStream_File[content_length "+( hasContentSize() ? jau::to_decstring(m_content_size) : "n/a" )+
536 ", consumed "+jau::to_decstring(m_offset)+
537 ", available "+jau::to_decstring(get_available())+
538 ", fd "+std::to_string(m_fd)+
539 ", iomode["+jau::io::to_string(m_iomode)+
540 ", iostate["+jau::io::to_string(rdstate())+
541 "], "+m_stats.toString()+
542 "]";
543}
544
545
548 m_url(std::move(url)), m_timeout(timeout), m_buffer(BEST_URLSTREAM_RINGBUFFER_SIZE),
549 m_stream_resp( read_url_stream_async(nullptr, m_url, /*httpPostReq=*/nullptr, &m_buffer, AsyncStreamConsumerFunc()) ),
550 m_offset(0), m_mark(npos), m_rewindbuf()
551{ }
552
553void ByteInStream_URL::close() noexcept {
554 DBG_PRINT("ByteInStream_URL: close.0 %s, %s", id().c_str(), to_string_int().c_str());
555
556 if( m_stream_resp->processing() ) {
557 m_stream_resp->result = io_result_t::SUCCESS; // signal end of streaming
558 }
559
560 m_buffer.close( true /* zeromem */); // also unblocks all r/w ops
561 if( m_stream_resp->thread.joinable() ) {
562 DBG_PRINT("ByteInStream_URL: close.1 %s, %s", id().c_str(), m_buffer.toString().c_str());
563 m_stream_resp->thread.join();
564 }
565 std::thread none;
566 m_stream_resp->thread.swap(none);
567 DBG_PRINT("ByteInStream_URL: close.X %s, %s", id().c_str(), to_string_int().c_str());
568}
569
570bool ByteInStream_URL::available(size_t n) noexcept {
571 if( !good() || !canRead() || !m_stream_resp->processing() ) {
572 // url thread ended, only remaining bytes in buffer available left
573 return m_buffer.size() >= n;
574 }
575 m_stream_resp->header_resp.wait_until_completion(m_timeout);
576 if( m_stream_resp->has_content_length && m_stream_resp->content_length - m_offset < n ) {
577 return false;
578 }
579 // I/O still in progress, we have to poll until data is available or timeout
580 // set_eof() unblocks ringbuffer via set_end_of_input(true) permanently, hence blocking call on !m_has_content_length is OK.
581 bool timeout_occured;
582 const size_t avail = m_buffer.waitForElements(n, m_timeout, timeout_occured);
583 if( avail < n ) {
584 if( timeout_occured ) {
586 if( m_stream_resp->processing() ) {
587 m_stream_resp->result = io_result_t::FAILED;
588 }
589 m_buffer.interruptWriter();
590 }
591 return false;
592 } else {
593 return true;
594 }
595}
596
597bool ByteInStream_URL::isOpen() const noexcept {
598 // url thread has not ended or remaining bytes in buffer available left
599 return m_stream_resp->processing() || m_buffer.size() > 0;
600}
601
602bool ByteInStream_URL::hasContentSize() const noexcept {
603 m_stream_resp->header_resp.wait_until_completion(m_timeout);
604 return m_stream_resp->has_content_length;
605}
606
608 m_stream_resp->header_resp.wait_until_completion(m_timeout);
609 const size_type length = contentSize();
610 if( fail() ) {
611 return ByteStream::npos;
612 } else if( m_rewindbuf.covered(m_mark, newPos) ){
614 m_offset = newPos;
615 return m_offset;
616 } else if( !hasContentSize() ) {
617 return ByteStream::npos;
618 } else if( newPos > length ) {
619 return ByteStream::npos;
620 } else if(newPos >= m_offset) {
622 discardRead(newPos - m_offset);
623 return m_offset;
624 } else {
625 DBG_PRINT("ByteInStream_URL::seek newPos %" PRIu64 "< position %" PRIu64 " not implemented", newPos, m_offset);
626 return ByteStream::npos;
627 }
628}
629
630[[nodiscard]] size_t ByteInStream_URL::discard(size_t N) noexcept {
631 return discardRead(N);
632}
633
634[[nodiscard]] bool ByteInStream_URL::setMark(size_type readLimit) noexcept {
635 if( m_rewindbuf.setMark(m_mark, m_offset, readLimit) ) {
636 m_mark = m_offset;
637 return true;
638 } else {
639 return false;
640 }
641}
642
643[[nodiscard]] bool ByteInStream_URL::seekMark() noexcept {
644 if( npos == m_mark ) {
645 return false;
646 }
647 return m_mark == seek(m_mark);
648}
649
650size_t ByteInStream_URL::read(void* out, size_t length) noexcept {
651 m_stream_resp->header_resp.wait_until_completion(m_timeout);
652 if( 0 == length || !good() || !canRead() ) {
653 return 0;
654 }
655 return m_rewindbuf.read(m_mark, m_offset, newData, out, length);
656}
657
658size_t ByteInStream_URL::peek(void* out, size_t length, size_t peek_offset) noexcept {
659 (void)out;
660 (void)length;
661 (void)peek_offset;
662 DBG_PRINT("ByteInStream_URL::peek not implemented");
663 return 0;
664}
665
667 if ( ( !m_stream_resp->processing() && m_buffer.isEmpty() ) ||
668 ( m_stream_resp->has_content_length && m_offset >= m_stream_resp->content_length ) )
669 {
671 }
672 if( m_stream_resp->failed() ) {
674 }
675 return rdstate_impl();
676}
677
678std::string ByteInStream_URL::to_string_int() const noexcept {
679 return m_url+", Url[content_length "+( hasContentSize() ? jau::to_decstring(m_stream_resp->content_length.load()) : "n/a" )+
680 ", xfered "+jau::to_decstring(m_stream_resp->total_read.load())+
681 ", result "+std::to_string((int8_t)m_stream_resp->result.load())+
682 "], consumed "+jau::to_decstring(m_offset)+
683 ", available "+jau::to_decstring(get_available())+
684 ", iomode["+jau::io::to_string(m_iomode)+
685 ", iostate["+jau::io::to_string(rdstate())+
686 "], rewind[mark "+std::to_string(m_mark)+", "+m_rewindbuf.toString()+
687 "], "+m_buffer.toString();
688}
689std::string ByteInStream_URL::toString() const noexcept {
690 return "ByteInStream_URL["+to_string_int()+"]";
691}
692
693std::unique_ptr<ByteStream> jau::io::to_ByteInStream(const std::string& path_or_uri, jau::fraction_i64 timeout) noexcept {
694 if( !jau::io::uri_tk::is_local_file_protocol(path_or_uri) &&
696 {
697 std::unique_ptr<ByteStream> res = std::make_unique<ByteInStream_URL>(path_or_uri, timeout);
698 if( nullptr != res && !res->fail() ) {
699 return res;
700 }
701 }
702 std::unique_ptr<ByteStream> res = std::make_unique<ByteStream_File>(path_or_uri, iomode_t::read);
703 if( nullptr != res && !res->fail() ) {
704 return res;
705 }
706 return nullptr;
707}
708
711 m_id(std::move(id_name)), m_timeout(timeout), m_buffer(BEST_URLSTREAM_RINGBUFFER_SIZE),
712 m_has_content_length( false ), m_content_size( 0 ), m_total_xfered( 0 ), m_result( io::io_result_t::NONE ),
713 m_offset(0)
714{ }
715
717 DBG_PRINT("ByteInStream_Feed: close.0 %s, %s", id().c_str(), toStringInt().c_str());
718
719 if( io_result_t::NONE == m_result ) {
720 m_result = io_result_t::SUCCESS; // signal end of streaming
721 }
722 m_buffer.close( true /* zeromem */); // also unblocks all r/w ops
723 DBG_PRINT("ByteInStream_Feed: close.X %s, %s", id().c_str(), toStringInt().c_str());
724}
725
726bool ByteInStream_Feed::available(size_t n) noexcept {
727 if( !good() || !canRead() || io_result_t::NONE != m_result ) {
728 // feeder completed, only remaining bytes in buffer available left
729 return m_buffer.size() >= n;
730 }
731 if( m_has_content_length && m_content_size - m_offset < n ) {
732 return false;
733 }
734 // I/O still in progress, we have to poll until data is available or timeout
735 // set_eof() unblocks ringbuffer via set_end_of_input(true) permanently, hence blocking call on !m_has_content_length is OK.
736 bool timeout_occured;
737 const size_t avail = m_buffer.waitForElements(n, m_timeout, timeout_occured);
738 if( avail < n ) {
739 if( timeout_occured ) {
741 if( io_result_t::NONE == m_result ) {
742 m_result = io_result_t::FAILED;
743 }
744 m_buffer.interruptWriter();
745 }
746 return false;
747 } else {
748 return true;
749 }
750}
751
752bool ByteInStream_Feed::isOpen() const noexcept {
753 // feeder has not ended or remaining bytes in buffer available left
754 return io_result_t::NONE == m_result || m_buffer.size() > 0;
755}
756
758 const size_type length = m_content_size;
759 if( fail() ) {
760 return ByteStream::npos;
761 } else if( m_rewindbuf.covered(m_mark, newPos) ){
763 m_offset = newPos;
764 return m_offset;
765 } else if( !hasContentSize() ) {
766 return ByteStream::npos;
767 } else if( newPos > length ) {
768 return ByteStream::npos;
769 } else if(newPos >= m_offset) {
771 discardRead(newPos - m_offset);
772 return m_offset;
773 } else {
774 DBG_PRINT("ByteInStream_Feed::seek newPos %" PRIu64 "< position %" PRIu64 " not implemented", newPos, m_offset);
775 return ByteStream::npos;
776 }
777}
778
779[[nodiscard]] size_t ByteInStream_Feed::discard(size_t N) noexcept {
780 return discardRead(N);
781}
782
783[[nodiscard]] bool ByteInStream_Feed::setMark(size_type readLimit) noexcept {
784 if( m_rewindbuf.setMark(m_mark, m_offset, readLimit) ) {
785 m_mark = m_offset;
786 return true;
787 } else {
788 return false;
789 }
790}
791
792[[nodiscard]] bool ByteInStream_Feed::seekMark() noexcept {
793 if( npos == m_mark ) {
794 return false;
795 }
796 return m_mark == seek(m_mark);
797}
798
799size_t ByteInStream_Feed::read(void* out, size_t length) noexcept {
800 if( 0 == length || !good() || !canRead() ) {
801 return 0;
802 }
803 return m_rewindbuf.read(m_mark, m_offset, newData, out, length);
804}
805
806size_t ByteInStream_Feed::peek(void* out, size_t length, size_t peek_offset) noexcept {
807 (void)out;
808 (void)length;
809 (void)peek_offset;
810 DBG_PRINT("ByteInStream_Feed::peek not implemented");
811 return 0;
812}
813
815 if ( ( io_result_t::NONE != m_result && m_buffer.isEmpty() ) ||
816 ( m_has_content_length && m_offset >= m_content_size ) )
817 {
819 }
820 if( io_result_t::FAILED == m_result ) {
822 }
823 return rdstate_impl();
824}
825
826size_t ByteInStream_Feed::write(const void* in, size_t length, const jau::fraction_i64& timeout) noexcept {
827 if( 0 < length && canWrite() && good() && io_result_t::NONE == m_result ) { // feeder still running
828 bool timeout_occured;
829 const uint8_t *in8 = reinterpret_cast<const uint8_t*>(in);
830 if( m_buffer.putBlocking(in8, in8+length, timeout, timeout_occured) ) {
831 m_total_xfered.fetch_add(length);
832 return length;
833 } else {
834 if( timeout_occured ) {
836 m_buffer.interruptWriter();
837 } else {
839 }
840 if( io_result_t::NONE == m_result ) {
841 m_result = io_result_t::FAILED;
842 }
843 return 0;
844 }
845 } else {
846 return 0;
847 }
848}
849
850void ByteInStream_Feed::setEOF(const io_result_t result) noexcept {
851 m_result = result;
852 m_buffer.set_end_of_input(true); // still considering last data, also irqs blocking ringbuffer reader
853}
854
855std::string ByteInStream_Feed::toStringInt() const noexcept {
856 return m_id+", ext[content_length "+( hasContentSize() ? jau::to_decstring(m_content_size.load()) : "n/a" )+
857 ", xfered "+jau::to_decstring(m_total_xfered.load())+
858 ", result "+std::to_string((int8_t)m_result.load())+
859 "], consumed "+std::to_string(m_offset)+
860 ", available "+std::to_string(getAvailable())+
861 ", iomode["+jau::io::to_string(m_iomode)+
862 ", iostate["+jau::io::to_string(rdstate())+
863 "], "+m_buffer.toString();
864}
865
866std::string ByteInStream_Feed::toString() const noexcept {
867 return "ByteInStream_Feed["+toStringInt()+"]";
868}
869
872 m_parent.close();
873 DBG_PRINT("ByteInStream_Recorder: close.X %s", id().c_str());
874}
875
877 m_buffer.resize(0);
878 m_rec_offset = m_offset;
879 m_is_recording = true;
880}
881
883 m_is_recording = false;
884}
885
887 m_is_recording = false;
888 m_buffer.clear();
889 m_rec_offset = 0;
890}
891
892size_t ByteStream_Recorder::read(void* out, size_t length) noexcept {
893 const size_t consumed_bytes = m_parent.read(out, length);
894 m_offset += consumed_bytes;
895 if( isRecording() && consumed_bytes > 0 ) {
896 uint8_t* out_u8 = static_cast<uint8_t*>(out);
897 m_buffer.insert(m_buffer.end(), out_u8, out_u8+consumed_bytes);
898 }
899 return consumed_bytes;
900}
901
902size_t ByteStream_Recorder::write(const void* out, size_t length) noexcept {
903 const size_t consumed_bytes = m_parent.write(out, length);
904 m_offset += consumed_bytes;
905 if( isRecording() && consumed_bytes > 0 ) {
906 const uint8_t* out_u8 = reinterpret_cast<const uint8_t*>(out);
907 m_buffer.insert(m_buffer.end(), out_u8, out_u8+consumed_bytes);
908 }
909 return consumed_bytes;
910}
911
912std::string ByteStream_Recorder::toString() const noexcept {
913 return "ByteInStream_Recorder[parent "+m_parent.id()+", recording[on "+std::to_string(m_is_recording)+
914 " offset "+jau::to_decstring(m_rec_offset)+
915 "], consumed "+jau::to_decstring(m_offset)+
916 ", iomode["+jau::io::to_string(m_iomode)+
917 ", iostate["+jau::io::to_string(rdstate())+"]]";
918}
919
920std::string jau::io::to_string(const ioaccess_t v) noexcept {
921 return v == ioaccess_t::read ? "read" : "write";
922}
static constexpr void copy_mem(void *out, const void *in, size_t n) noexcept
static bool _jau_file_size(const int fd, const jau::io::fs::file_stats &stats, const off64_t cur_pos, ByteStream::size_type &len) noexcept
#define O_BINARY
#define __posix_lseek64
#define __posix_openat64
bool available(size_t n) noexcept override
Check whether n bytes are available in the input stream.
bool seekMark() noexcept override
Seeks stream position to markpos as set via setMark().
bool setMark(size_type readLimit) noexcept override
Set markpos to current position, allowing the stream to be seekMark().
std::string toString() const noexcept override
void setEOF(const io_result_t result) noexcept
Set end-of-data (EOS), i.e.
bool isOpen() const noexcept override
Checks if the stream has an associated file.
size_t peek(void *out, size_t length, size_type peek_offset) noexcept override
Not implemented, returns 0.
iostate_t rdstate() const noexcept override
Returns the current state flags.
ByteInStream_Feed(std::string id_name, const jau::fraction_i64 &timeout, lb_endian_t byteOrder=lb_endian_t::little) noexcept
Construct a ringbuffer backed externally provisioned byte input stream.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
size_t write(const void *in, size_t length, const jau::fraction_i64 &timeout) noexcept
Write given bytes to the async ringbuffer using explicit given timeout.
bool hasContentSize() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
size_t read(void *out, size_t length) noexcept override
Read from the source.
size_type seek(size_type newPos) noexcept override
newPos < position() limited to markpos, see setMark().
size_t discard(size_t N) noexcept override
Implemented by skipping input stream via read.
std::string toString() const noexcept override
bool isOpen() const noexcept override
Checks if the stream has an associated file.
ByteInStream_URL(std::string url, const jau::fraction_i64 &timeout, lb_endian_t byteOrder=lb_endian_t::little) noexcept
Construct a ringbuffer backed Http byte input stream.
iostate_t rdstate() const noexcept override
Returns the current state flags.
size_t read(void *out, size_t length) noexcept override
Read from the source.
bool setMark(size_type readLimit) noexcept override
Set markpos to current position, allowing the stream to be seekMark().
size_t discard(size_t N) noexcept override
Implemented by skipping input stream via read.
size_t peek(void *out, size_t length, size_type peek_offset) noexcept override
Read from the source but do not modify the internal offset.
bool hasContentSize() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
bool available(size_t n) noexcept override
Check whether n bytes are available in the input stream.
bool seekMark() noexcept override
Seeks stream position to markpos as set via setMark().
size_type seek(size_type newPos) noexcept override
newPos < position() limited to markpos, see setMark().
void close() noexcept override
Close the stream if supported by the underlying mechanism.
size_type contentSize() const noexcept override
Returns the content_size if known.
size_type seek(size_type newPos) noexcept override
Sets position indicator for output-streams or input-streams with known length, similar to e....
size_t peek(void *, size_t, size_type) noexcept override
Read from the source but do not modify the internal offset.
bool setMark(size_type readLimit) noexcept override
Set markpos to current position, allowing the stream to be seekMark().
size_t discard(size_t N) noexcept override
Discard the next N bytes of the data.
bool hasContentSize() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
void flush() noexcept override
Synchronizes all output operations, or do nothing.
bool isOpen() const noexcept override
Checks if the stream has an associated file.
int fd() const noexcept
Returns the file descriptor if is_open(), otherwise -1 for no file descriptor.
size_t write(const void *, size_t) noexcept override
Write to the data sink.
std::string toString() const noexcept override
ByteStream_File(const std::string &path, iomode_t iomode=iomode_t::rw, const jau::io::fs::fmode_t fmode=fs::fmode_t::def_file_prot, lb_endian_t byteOrder=lb_endian_t::little) noexcept
Construct a stream based byte stream from filesystem path, either an existing or new file.
bool available(size_t n) noexcept override
Return whether n bytes are available in the input stream, if has_content_size() or using an asynchron...
size_t read(void *, size_t) noexcept override
Read from the source.
bool seekMark() noexcept override
Seeks stream position to markpos as set via setMark().
std::string toString() const noexcept override
void close() noexcept override
Close the stream if supported by the underlying mechanism.
void startRecording() noexcept
Starts the recording.
size_t write(const void *, size_t) noexcept override
Write to the data sink.
void stopRecording() noexcept
Stops the recording.
size_t read(void *, size_t) noexcept override
Read from the source.
iostate_t rdstate() const noexcept override
Returns the current state flags.
void clearRecording() noexcept
Clears the recording.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
std::string toString() const noexcept override
size_t discard(size_t N) noexcept override
Discard the next N bytes of the data.
size_t peek(void *, size_t, size_type) noexcept override
Read from the source but do not modify the internal offset.
bool setMark(size_type readLimit) noexcept override
Set markpos to current position, allowing the stream to be seekMark().
size_type seek(size_type newPos) noexcept override
Sets position indicator for output-streams or input-streams with known length, similar to e....
bool seekMark() noexcept override
Seeks stream position to markpos as set via setMark().
bool available(size_t n) noexcept override
Return whether n bytes are available in the input stream, if has_content_size() or using an asynchron...
size_t read(void *, size_t) noexcept override
Read from the source.
ByteStream_SecMemory(const std::string &in, lb_endian_t byteOrder=lb_endian_t::little)
Construct a secure memory source that reads from a string, iomode_t::read.
size_t write(const void *, size_t) noexcept override
Write to the data sink.
constexpr bool canWrite() const noexcept
Returns true in case stream has iomode::write capabilities.
constexpr bool canRead() const noexcept
Returns true in case stream has iomode::read capabilities.
size_t discardRead(size_t n) noexcept
Fallback slow discard implementation usind read() in case of unknown stream size.
constexpr lb_endian_t byteOrder() const noexcept
Returns endian byte-order of stream storage.
uint64_t size_type
uint64_t size data type, bit position and count
ByteStream(iomode_t mode, lb_endian_t byteOrder=lb_endian_t::little) noexcept
constexpr iomode_t mode() const noexcept
static constexpr size_type npos
Invalid position constant, denoting unset mark() or invalid position.
bool fail() const noexcept
Checks if an error has occurred.
virtual iostate_t rdstate() const noexcept
Returns the current state flags.
void clearStateFlags(const iostate_t clr) noexcept
Clears given state flags from current value.
bool good() const noexcept
Checks if no error nor eof() has occurred i.e.
bool timeout() const noexcept
Checks if a timeout (non-recoverable) has occurred.
constexpr iostate_t rdstate_impl() const noexcept
constexpr void addstate_impl(iostate_t state) const noexcept
Platform agnostic representation of POSIX ::lstat() and ::stat() for a given pathname.
#define ERR_PRINT(...)
Use for unconditional error messages, prefix '[elapsed_time] Error @ FILE:LINE FUNC: '.
Definition debug.hpp:122
#define DBG_PRINT(...)
Use for environment-variable environment::DEBUG conditional debug messages, prefix '[elapsed_time] De...
Definition debug.hpp:72
lb_endian_t
Simplified reduced endian type only covering little- and big-endian.
const uint8_t * cast_char_ptr_to_uint8(const char *s) noexcept
#define PRAGMA_DISABLE_WARNING_TYPE_RANGE_LIMIT
constexpr bool is_set(const E mask, const E bits) noexcept
#define PRAGMA_DISABLE_WARNING_PUSH
#define PRAGMA_DISABLE_WARNING_POP
fmode_t
Generic file type and POSIX protection mode bits as used in file_stats, touch(), mkdir() etc.
constexpr ::mode_t posix_protection_bits(const fmode_t mask) noexcept
Returns the POSIX protection bits: rwx_all | set_uid | set_gid | sticky, i.e.
fraction< int64_t > fraction_i64
fraction using int64_t as integral type
AsyncStreamResponseRef read_url_stream_async(net_tk_handle handle, const std::string &url, http::PostRequestPtr httpPostReq, ByteRingbuffer *buffer, const AsyncStreamConsumerFunc &consumer_fn) noexcept
Asynchronous URL stream reader using the given AsyncStreamConsumerFunc consumer_fn.
Definition io_util.cpp:1119
bool is_local_file_protocol(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches the local file protocol, i.e.
Definition io_util.cpp:205
iomode_t
Stream I/O mode, e.g.
std::string to_string(const ioaccess_t v) noexcept
Return std::string representation of the given ioaccess_t.
io_result_t
I/O operation result value.
Definition io_util.hpp:55
ioaccess_t
I/O read or write access.
jau::function< bool(AsyncStreamResponse &, const uint8_t *, size_t, bool)> AsyncStreamConsumerFunc
Asynchronous stream consumer function.
Definition io_util.hpp:356
std::string toString(io_result_t v) noexcept
Definition io_util.hpp:67
std::unique_ptr< ByteStream > to_ByteInStream(const std::string &path_or_uri, jau::fraction_i64 timeout=20_s) noexcept
Parses the given path_or_uri, if it matches a supported protocol, see jau::io::uri::protocol_supporte...
const size_t BEST_URLSTREAM_RINGBUFFER_SIZE
bool protocol_supported(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches a supported by libcurl network protocols otherwis...
Definition io_util.cpp:195
@ rw
Read and write capabilities, i.e.
@ atend
Seek to end of (file) stream when opened.
@ trunc
Truncate existing (file) stream when opened with write.
@ read
Read capabilities.
@ NONE
Operation still in progress.
Definition io_util.hpp:60
@ FAILED
Operation failed.
Definition io_util.hpp:57
@ SUCCESS
Operation succeeded.
Definition io_util.hpp:63
@ read
Read intent.
@ none
No error occurred nor has EOS being reached.
@ failbit
Input or output operation failed (formatting or extraction error).
@ eofbit
An input operation reached the end of its stream (EOS).
@ timeout
Input or output operation failed due to timeout.
std::string to_decstring(const value_type &v, const char separator=',', const nsize_t width=0) noexcept
Produce a decimal string representation of an integral integer value.
__pack(...): Produces MSVC, clang and gcc compatible lead-in and -out macros.
Definition backtrace.hpp:32
STL namespace.
relaxed_atomic_uint64 content_length
content_length tracking the content_length
Definition io_util.hpp:338
CXX_ALWAYS_INLINE _Tp load() const noexcept