jaulib v1.5.0
Jau Support Library (C++, Java, ..)
Loading...
Searching...
No Matches
byte_stream.cpp
Go to the documentation of this file.
1/*
2 * Author: Sven Gothel <sgothel@jausoft.com>
3 * Copyright (c) 2021-2023 Gothel Software e.K.
4 *
5 * ByteStream, ByteInStream_SecMemory and ByteStream_istream are derived from Botan under same license:
6 * - Copyright (c) 1999-2007 Jack Lloyd
7 * - Copyright (c) 2005 Matthew Gregan
8 *
9 * Permission is hereby granted, free of charge, to any person obtaining
10 * a copy of this software and associated documentation files (the
11 * "Software"), to deal in the Software without restriction, including
12 * without limitation the rights to use, copy, modify, merge, publish,
13 * distribute, sublicense, and/or sell copies of the Software, and to
14 * permit persons to whom the Software is furnished to do so, subject to
15 * the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be
18 * included in all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
21 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
22 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
23 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
24 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
25 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
26 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
27 */
28
29// #include <botan_all.h>
30
31#include <cstddef>
32#include <cstring>
33#include <limits>
34#include <jau/cpuid.hpp>
35#include <jau/debug.hpp>
36
38#include <jau/io/io_util.hpp>
39#include <jau/io/bit_stream.hpp>
40
41extern "C" {
42 #include <unistd.h>
43 #include <sys/types.h>
44 #include <sys/stat.h>
45 #include <fcntl.h>
46}
47
48#ifdef USE_LIBCURL
49 #include <curl/curl.h>
50#endif
51
52#include <thread>
53#include <pthread.h>
54
55#ifndef O_BINARY
56#define O_BINARY 0
57#endif
58#ifndef O_NONBLOCK
59#define O_NONBLOCK 0
60#endif
61
62using namespace jau::io;
63using namespace jau::fractions_i64_literals;
64using namespace jau::int_literals;
65
66#if defined(__FreeBSD__)
67 typedef off_t off64_t;
68 #define __posix_openat64 ::openat
69 #define __posix_lseek64 ::lseek
70#else
71 #define __posix_openat64 ::openat64
72 #define __posix_lseek64 ::lseek64
73#endif
74
75#ifdef USE_LIBCURL
76 const size_t jau::io::BEST_URLSTREAM_RINGBUFFER_SIZE = 2_uz * (size_t)CURL_MAX_WRITE_SIZE;
77#else
78 const size_t jau::io::BEST_URLSTREAM_RINGBUFFER_SIZE = 2_uz * 16384_uz;
79#endif
80
81inline constexpr static void copy_mem(void* out, const void* in, size_t n) noexcept {
82 if(in != nullptr && out != nullptr && n > 0) {
83 std::memcpy(out, in, sizeof(uint8_t)*n);
84 }
85}
86
87size_t ByteStream::discardRead(size_t n) noexcept {
88 if( !good() || !canRead() ) {
89 return 0;
90 }
91 uint8_t buf[1024] = { 0 };
92 size_t discarded = 0;
93
94 while(n)
95 {
96 const size_t got = read(buf, std::min(n, sizeof(buf)));
97 if( 0 == got ) {
98 break;
99 }
100 discarded += got;
101 n -= got;
102 }
103 return discarded;
104}
105
109 if( fail() ) {
110 return ByteStream::npos;
111 } else if( newPos > m_source.size() || newPos > std::numeric_limits<size_t>::max() ) {
112 return ByteStream::npos;
113 }
116 m_offset = static_cast<size_t>(newPos);
117 if( m_mark > newPos ) {
118 m_mark = npos;
119 }
120 if( m_source.size() == m_offset ) {
122 }
123 return m_offset;
124}
125
126[[nodiscard]] size_t ByteStream_SecMemory::discard(size_t N) noexcept {
127 if( !good() || !canRead() ) {
128 return 0;
129 }
130 size_t n = std::min(N, m_source.size() - m_offset);
131 m_offset += n;
132 if( m_source.size() == m_offset ) {
134 }
135 return n;
136}
137
138[[nodiscard]] bool ByteStream_SecMemory::setMark(size_type) noexcept {
139 m_mark = m_offset;
140 return true;
141}
142
143[[nodiscard]] bool ByteStream_SecMemory::seekMark() noexcept {
144 if( npos == m_mark ) {
145 return false;
146 }
147 return m_mark == seek(m_mark);
148}
149
150size_t ByteStream_SecMemory::read(void* out, size_t length) noexcept {
151 if( 0 == length || !good() || !canRead() ) {
152 return 0;
153 }
154 const size_t got = std::min<size_t>(m_source.size() - m_offset, length);
155 copy_mem(out, m_source.data() + m_offset, got);
156 m_offset += got;
157 if( m_source.size() == m_offset ) {
159 }
160 return got;
161}
162
163bool ByteStream_SecMemory::available(size_t n) noexcept {
164 return canRead() && m_source.size() - m_offset >= n;
165}
166
167size_t ByteStream_SecMemory::peek(void* out, size_t length, size_type peek_offset) noexcept {
170 const size_t bytes_left = m_source.size() - m_offset;
171 if( 0 == length || !good() || !canRead() ||
172 ( peek_offset > std::numeric_limits<size_t>::max() ) ||
173 ( bytes_left < peek_offset + 1 /* min number of bytes to read */ ) ) {
174 return 0;
175 }
177 const size_t po_sz = static_cast<size_t>(peek_offset);
178 const size_t peek_len = std::min(bytes_left - po_sz, length);
179 copy_mem(out, &m_source[m_offset + po_sz], peek_len);
180 return peek_len;
181}
182
183[[nodiscard]] size_t ByteStream_SecMemory::write(const void* out, size_t length) noexcept {
184 if( 0 == length || fail() || !canWrite() ) {
185 return 0;
186 }
187 const size_t got = std::min<size_t>(m_source.size() - m_offset, length);
188 copy_mem(m_source.data() + m_offset, out, got);
189 m_offset += got;
190 return got;
191}
192
195 m_source(cast_char_ptr_to_uint8(in.data()),
196 cast_char_ptr_to_uint8(in.data()) + in.length()),
197 m_offset(0), m_mark(npos)
198{ }
199
201 m_source.clear();
202 m_offset = 0;
204}
205
206std::string ByteStream_SecMemory::toString() const noexcept {
207 return "ByteInStream_SecMemory[content size "+jau::to_decstring(m_source.size())+
208 ", consumed "+jau::to_decstring(m_offset)+
209 ", available "+jau::to_decstring(m_source.size()-m_offset)+
210 ", iomode["+jau::io::to_string(m_iomode)+
211 ", iostate["+jau::io::to_string(rdstate())+
212 "]]";
213}
214
216 if( fail() ||
217 !m_has_content_length ||
218 ( !canWrite() && newPos > m_content_size ) ||
219 newPos > std::numeric_limits<off64_t>::max() )
220 {
221 return ByteStream::npos;
222 }
224 if( newPos != m_offset ) {
225 off64_t abs_pos = __posix_lseek64(m_fd, static_cast<off64_t>(newPos), SEEK_SET);
226 if( 0 > abs_pos ) {
228 jau_ERR_PRINT("Failed to seek to position %" PRIu64 " of existing file %s",
229 newPos, toString().c_str());
230 return ByteStream::npos;
231 }
232 m_offset = abs_pos;
233 if( m_mark > m_offset ) {
234 m_mark = npos;
235 }
236 if( m_content_size == m_offset ) {
238 }
239 }
240 return m_offset;
241}
242
243[[nodiscard]] size_t ByteStream_File::discard(size_t N) noexcept {
244 if( !canRead() ) { return 0; }
245 if( m_has_content_length ) {
246 if( !good() ) {
247 return 0;
248 }
249 const size_t n = std::min<size_t>(N, m_content_size - m_offset);
250 const size_t p0 = m_offset;
251 return seek(p0 + n) - p0;
252 } else {
253 return discardRead(N);
254 }
255}
256
257[[nodiscard]] bool ByteStream_File::setMark(size_type) noexcept {
258 m_mark = m_offset;
259 return true;
260}
261
262[[nodiscard]] bool ByteStream_File::seekMark() noexcept {
263 if( npos == m_mark ) {
264 return false;
265 }
266 return m_mark == seek(m_mark);
267}
268
269[[nodiscard]] size_t ByteStream_File::read(void* out, size_t length) noexcept {
270 if( 0 == length || !good() || !canRead() ) {
271 return 0;
272 }
273 uint8_t* out_u8 = static_cast<uint8_t*>(out);
274 size_t total = 0;
275 while( total < length ) {
276 ssize_t len;
277 while ( ( len = ::read(m_fd, out_u8+total, length-total) ) < 0 ) {
278 if ( errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR ) {
279 // cont temp unavail or interruption
280 // unlikely for regular files and we open w/o O_NONBLOCK
281 // - EAGAIN (file) and EWOULDBLOCK (socket) if blocking
282 // - EINTR (signal)
283 continue;
284 }
285 // Check errno == ETIMEDOUT ??
287 jau_DBG_PRINT("ByteInStream_File::read: Error occurred in %s, errno %d %s", toString(), (int)errno, strerror(errno));
288 return 0;
289 }
290 total += static_cast<size_t>(len);
291 if( 0 == len || ( m_has_content_length && m_offset + total >= m_content_size ) ) {
292 addstate_impl( iostate_t::eofbit ); // Note: std::istream also sets iostate::failbit on eof, we don't.
293 break;
294 }
295 }
296 m_offset += total;
297 return total;
298}
299
300size_t ByteStream_File::peek(void* out, size_t length, size_type peek_offset) noexcept {
301 const size_type bytes_left = ( m_has_content_length ? m_content_size : std::numeric_limits<off64_t>::max() ) - m_offset;
302 if( 0 == length || !good() || !canRead() ||
303 ( peek_offset > std::numeric_limits<off64_t>::max() ) ||
304 ( bytes_left < peek_offset + 1 /* min number of bytes to read */ ) ) {
305 return 0;
306 }
307 size_t got = 0;
308
309 off64_t abs_pos = 0;
310 if( 0 < peek_offset ) {
311 abs_pos = __posix_lseek64(m_fd, static_cast<off64_t>(peek_offset), SEEK_CUR);
312 if( 0 > abs_pos ) {
314 jau_DBG_PRINT("ByteInStream_File::peek: Error occurred (offset1 %zu) in %s, errno %d %s",
315 peek_offset, toString(), (int)errno, strerror(errno));
316 return 0;
317 }
318 }
319 if( abs_pos == static_cast<off64_t>(peek_offset) ) { // NOLINT(modernize-use-integer-sign-comparison): false positive
320 ssize_t len = 0;
321 while ( ( len = ::read(m_fd, out, length) ) < 0 ) {
322 if ( errno == EAGAIN || errno == EINTR ) {
323 // cont temp unavail or interruption
324 continue;
325 }
326 // Check errno == ETIMEDOUT ??
328 jau_DBG_PRINT("ByteInStream_File::peak: Error occurred (read) in %s, errno %d %s", toString(), (int)errno, strerror(errno));
329 return 0;
330 }
331 got = len; // potentially zero bytes, i.e. eof
332 }
333 if( __posix_lseek64(m_fd, static_cast<off64_t>(m_offset), SEEK_SET) < 0 ) {
334 // even though we were able to fetch the desired data above, let's fail if position reset fails
336 jau_DBG_PRINT("ByteInStream_File::peek: Error occurred (offset2 %zu) in %s, errno %d %s",
337 peek_offset, toString(), (int)errno, strerror(errno));
338 return 0;
339 }
340 return got;
341}
342
343bool ByteStream_File::available(size_t n) noexcept {
344 return isOpen() && good() && canRead() && ( !m_has_content_length || m_content_size - m_offset >= (size_type)n );
345}
346
347size_t ByteStream_File::write(const void* out, size_t length) noexcept {
348 if( 0 == length || fail() || !canWrite() ) {
349 return 0;
350 }
351 const uint8_t* out_u8 = static_cast<const uint8_t*>(out);
352 size_t total = 0;
353 while( total < length ) {
354 ssize_t len;
355 while ( ( len = ::write(m_fd, out_u8+total, length-total) ) < 0 ) {
356 if ( errno == EAGAIN || errno == EINTR ) {
357 // cont temp unavail or interruption
358 // unlikely for regular files and we open w/o O_NONBLOCK
359 // - EAGAIN (file) and EWOULDBLOCK (socket) if blocking
360 // - EINTR (signal)
361 continue;
362 }
363 // Check errno == ETIMEDOUT ??
365 jau_DBG_PRINT("ByteOutStream_File::write: Error occurred in %s, errno %d %s", toString(), (int)errno, strerror(errno));
366 return 0;
367 }
368 total += static_cast<size_t>(len);
369 if( 0 == len ) {
371 break;
372 }
373 }
374 m_offset += total;
375 if( m_has_content_length && m_offset > m_content_size ) {
376 m_content_size = m_offset;
378 }
379 return total;
380}
381
382static bool _jau_file_size(const int fd, const jau::io::fs::file_stats& stats, const off64_t cur_pos, ByteStream::size_type& len) noexcept {
383 if( stats.has( jau::io::fs::file_stats::field_t::size ) ) {
384 len = stats.size();
385 return true;
386 }
387 if( !stats.is_file() ) {
388 return false; // possible pipe or stdin etc
389 }
390 // should have been covered by stats, but try traditionally harder
391 const off64_t end = __posix_lseek64(fd, 0, SEEK_END);
392 if( 0 > end ) {
393 return false;
394 }
395 const off64_t cur_pos2 = __posix_lseek64(fd, cur_pos, SEEK_SET);
396 if( cur_pos2 != cur_pos ) {
397 jau_DBG_PRINT("ByteInStream_File::file_size: Error rewinding to current position failed, orig-pos %" PRIi64 " -> %" PRIi64 ", errno %d %s.",
398 cur_pos, cur_pos2,
399 (int)errno, strerror(errno));
400 return false;
401 }
402 len = ByteStream::size_type(end) + 1_u64;
403 return true;
404}
405
408 m_stats(fd), m_fd(-1), m_has_content_length(false), m_content_size(0),
409 m_offset(0), m_mark(npos)
410{
411 if( !m_stats.exists() || !m_stats.has_access() ) {
412 addstate_impl( iostate_t::failbit ); // Note: conforming with std::ifstream open
413 jau_DBG_PRINT("ByteInStream_File::ctor: Error, not an existing or accessible file in %s, %s", m_stats.toString().c_str(), toString().c_str());
414 return;
415 }
416 m_fd = ::dup(fd);
417 if ( 0 > m_fd ) {
418 addstate_impl( iostate_t::failbit ); // Note: conforming with std::ifstream open
419 jau_DBG_PRINT("ByteInStream_File::ctor: Error occurred in %s, %s", m_stats.toString().c_str(), toString().c_str());
420 return;
421 }
422 const off64_t cur_pos = __posix_lseek64(m_fd, 0, SEEK_CUR);
423 if( 0 > cur_pos ) {
425 jau_ERR_PRINT("Failed to read position of existing file %s, errno %d %s",
426 toString(), (int)errno, strerror(errno));
427 return;
428 }
429 m_offset = cur_pos;
430 if( _jau_file_size(m_fd, m_stats, cur_pos, m_content_size) ) {
431 m_has_content_length = true;
432 }
433}
434
435ByteStream_File::ByteStream_File(const int dirfd, const std::string& path, iomode_t iomode, const jau::io::fs::fmode_t fmode, lb_endian_t byteOrder) noexcept
436: ByteStream(iomode, byteOrder),
437 m_stats(), m_fd(-1), m_has_content_length(false), m_content_size(0),
438 m_offset(0), m_mark(npos)
439{
441 // cut of leading `file://`
442 std::string path2 = path.substr(7);
443 m_stats = jau::io::fs::file_stats(dirfd, path2);
444 } else {
445 m_stats = jau::io::fs::file_stats(dirfd, path);
446 }
447 if( !canRead() && !canWrite() ) {
448 addstate_impl( iostate_t::failbit ); // Note: conforming with std::ifstream open
449 jau_DBG_PRINT("ByteStream_File::ctor: Error, iomode_t invalid: %s, %s", to_string(m_iomode).c_str(), toString().c_str());
450 return;
451 }
452 if( ( m_stats.exists() && !m_stats.is_file() && !m_stats.has_fd() ) || !m_stats.has_access() ) {
453 addstate_impl( iostate_t::failbit ); // Note: conforming with std::ofstream open (?)
454 jau_DBG_PRINT("ByteStream_File::ctor: Error, an existing non[file, fd] or not accessible element in %s, %s", m_stats.toString().c_str(), toString().c_str());
455 return;
456 }
457 if( !canWrite() && !m_stats.exists() ) {
458 addstate_impl( iostate_t::failbit ); // Note: conforming with std::ifstream open
459 jau_DBG_PRINT("ByteStream_File::ctor: Error, can't open non-existing read-only file in %s, %s", m_stats.toString().c_str(), toString().c_str());
460 return;
461 }
462 bool truncated = false;
463 bool just_opened = false;
464 if( m_stats.has_fd() ) {
465 m_fd = ::dup( m_stats.fd() );
466 } else {
467 // O_NONBLOCK, is useless on files and counter to this class logic
468 int flags = O_BINARY|O_NOCTTY;
469 if( canRead() && !canWrite() ) {
470 flags |= O_RDONLY;
471 } else if( !canRead() && canWrite() ) {
472 flags |= O_WRONLY;
473 } else {
474 flags |= O_RDWR;
475 }
476 if( !m_stats.exists() ) {
477 flags |= O_CREAT;
478 } else if( canWrite() && is_set(m_iomode, iomode_t::trunc) ) {
479 flags |= O_TRUNC;
480 truncated = true;
481 }
482 m_fd = __posix_openat64(dirfd, m_stats.path().c_str(), flags, jau::io::fs::posix_protection_bits( fmode ));
483 just_opened = true;
484 }
485 if ( 0 > m_fd ) {
486 addstate_impl( iostate_t::failbit ); // Note: conforming with std::ifstream open
487 jau_DBG_PRINT("ByteInStream_File::ctor: Error while opening %s, %s", m_stats.toString().c_str(), toString().c_str());
488 return;
489 }
490 if( truncated ) {
491 // m_content_size = 0, m_offset = 0
492 return;
493 }
494 const off64_t cur_pos = just_opened || !m_stats.is_file() ? 0 : __posix_lseek64(m_fd, 0, SEEK_CUR);
495 if( 0 > cur_pos ) {
497 jau_ERR_PRINT("Failed to read position of existing file %s, errno %d %s", toString(), (int)errno, strerror(errno));
498 return;
499 }
500 m_offset = cur_pos;
501 if( _jau_file_size(m_fd, m_stats, cur_pos, m_content_size) ) {
502 m_has_content_length = true;
503 }
504 if( m_stats.is_file() && is_set(m_iomode, iomode_t::atend) ) {
505 const off64_t end = __posix_lseek64(m_fd, 0, SEEK_END);
506 if( 0 > end ) {
508 jau_ERR_PRINT("Failed to position existing file to end %s, errno %d %s", toString(), (int)errno, strerror(errno));
509 return;
510 }
511 m_offset = end;
512 }
513}
514
516: ByteStream_File(AT_FDCWD, path, mode, fmode, byteOrder) {}
517
518void ByteStream_File::close() noexcept {
519 if( 0 <= m_fd ) {
520 ::close(m_fd);
521 m_fd = -1;
523 }
524}
525
526void ByteStream_File::flush() noexcept {
527 if( 0 <= m_fd && canWrite() ) {
528 ::fdatasync(m_fd);
529 }
530}
531
532std::string ByteStream_File::toString() const noexcept {
533 return "ByteInStream_File[content_length "+( hasContentSize() ? jau::to_decstring(m_content_size) : "n/a" )+
534 ", consumed "+jau::to_decstring(m_offset)+
535 ", available "+jau::to_decstring(get_available())+
536 ", fd "+std::to_string(m_fd)+
537 ", iomode["+jau::io::to_string(m_iomode)+
538 ", iostate["+jau::io::to_string(rdstate())+
539 "], "+m_stats.toString()+
540 "]";
541}
542
543
546 m_url(std::move(url)), m_timeout(timeout), m_buffer(BEST_URLSTREAM_RINGBUFFER_SIZE),
547 m_stream_resp( read_url_stream_async(nullptr, m_url, /*httpPostReq=*/nullptr, &m_buffer, AsyncStreamConsumerFunc()) ),
548 m_offset(0), m_mark(npos), m_rewindbuf()
549{ }
550
551void ByteInStream_URL::close() noexcept {
552 jau_DBG_PRINT("ByteInStream_URL: close.0 %s, %s", id().c_str(), to_string_int().c_str());
553
554 if( m_stream_resp->processing() ) {
555 m_stream_resp->result = io_result_t::SUCCESS; // signal end of streaming
556 }
557
558 m_buffer.close( true /* zeromem */); // also unblocks all r/w ops
559 if( m_stream_resp->thread.joinable() ) {
560 jau_DBG_PRINT("ByteInStream_URL: close.1 %s, %s", id().c_str(), m_buffer.toString().c_str());
561 m_stream_resp->thread.join();
562 }
563 std::thread none;
564 m_stream_resp->thread.swap(none);
565 jau_DBG_PRINT("ByteInStream_URL: close.X %s, %s", id().c_str(), to_string_int().c_str());
566}
567
568bool ByteInStream_URL::available(size_t n) noexcept {
569 if( !good() || !canRead() || !m_stream_resp->processing() ) {
570 // url thread ended, only remaining bytes in buffer available left
571 return m_buffer.size() >= n;
572 }
573 m_stream_resp->header_resp.wait_until_completion(m_timeout);
574 if( m_stream_resp->has_content_length && m_stream_resp->content_length - m_offset < n ) {
575 return false;
576 }
577 // I/O still in progress, we have to poll until data is available or timeout
578 // set_eof() unblocks ringbuffer via set_end_of_input(true) permanently, hence blocking call on !m_has_content_length is OK.
579 bool timeout_occured;
580 const size_t avail = m_buffer.waitForElements(n, m_timeout, timeout_occured);
581 if( avail < n ) {
582 if( timeout_occured ) {
584 if( m_stream_resp->processing() ) {
585 m_stream_resp->result = io_result_t::FAILED;
586 }
587 m_buffer.interruptWriter();
588 }
589 return false;
590 } else {
591 return true;
592 }
593}
594
595bool ByteInStream_URL::isOpen() const noexcept {
596 // url thread has not ended or remaining bytes in buffer available left
597 return m_stream_resp->processing() || m_buffer.size() > 0;
598}
599
600bool ByteInStream_URL::hasContentSize() const noexcept {
601 m_stream_resp->header_resp.wait_until_completion(m_timeout);
602 return m_stream_resp->has_content_length;
603}
604
606 m_stream_resp->header_resp.wait_until_completion(m_timeout);
607 const size_type length = contentSize();
608 if( fail() ) {
609 return ByteStream::npos;
610 } else if( m_rewindbuf.covered(m_mark, newPos) ){
612 m_offset = newPos;
613 return m_offset;
614 } else if( !hasContentSize() ) {
615 return ByteStream::npos;
616 } else if( newPos > length ) {
617 return ByteStream::npos;
618 } else if(newPos >= m_offset) {
620 discardRead(newPos - m_offset);
621 return m_offset;
622 } else {
623 jau_DBG_PRINT("ByteInStream_URL::seek newPos %" PRIu64 "< position %" PRIu64 " not implemented", newPos, m_offset);
624 return ByteStream::npos;
625 }
626}
627
628[[nodiscard]] size_t ByteInStream_URL::discard(size_t N) noexcept {
629 return discardRead(N);
630}
631
632[[nodiscard]] bool ByteInStream_URL::setMark(size_type readLimit) noexcept {
633 if( m_rewindbuf.setMark(m_mark, m_offset, readLimit) ) {
634 m_mark = m_offset;
635 return true;
636 } else {
637 return false;
638 }
639}
640
641[[nodiscard]] bool ByteInStream_URL::seekMark() noexcept {
642 if( npos == m_mark ) {
643 return false;
644 }
645 return m_mark == seek(m_mark);
646}
647
648size_t ByteInStream_URL::read(void* out, size_t length) noexcept {
649 m_stream_resp->header_resp.wait_until_completion(m_timeout);
650 if( 0 == length || !good() || !canRead() ) {
651 return 0;
652 }
653 return m_rewindbuf.read(m_mark, m_offset, newData, out, length);
654}
655
656size_t ByteInStream_URL::peek(void* out, size_t length, size_type peek_offset) noexcept {
657 (void)out;
658 (void)length;
659 (void)peek_offset;
660 jau_DBG_PRINT("ByteInStream_URL::peek not implemented");
661 return 0;
662}
663
665 if ( ( !m_stream_resp->processing() && m_buffer.isEmpty() ) ||
666 ( m_stream_resp->has_content_length && m_offset >= m_stream_resp->content_length ) )
667 {
669 }
670 if( m_stream_resp->failed() ) {
672 }
673 return rdstate_impl();
674}
675
676std::string ByteInStream_URL::to_string_int() const noexcept {
677 return m_url+", Url[content_length "+( hasContentSize() ? jau::to_decstring(m_stream_resp->content_length.load()) : "n/a" )+
678 ", xfered "+jau::to_decstring(m_stream_resp->total_read.load())+
679 ", result "+std::to_string((int8_t)m_stream_resp->result.load())+
680 "], consumed "+jau::to_decstring(m_offset)+
681 ", available "+jau::to_decstring(get_available())+
682 ", iomode["+jau::io::to_string(m_iomode)+
683 ", iostate["+jau::io::to_string(rdstate())+
684 "], rewind[mark "+std::to_string(m_mark)+", "+m_rewindbuf.toString()+
685 "], "+m_buffer.toString();
686}
687std::string ByteInStream_URL::toString() const noexcept {
688 return "ByteInStream_URL["+to_string_int()+"]";
689}
690
691std::unique_ptr<ByteStream> jau::io::to_ByteInStream(const std::string& path_or_uri, jau::fraction_i64 timeout) noexcept {
692 if( !jau::io::uri_tk::is_local_file_protocol(path_or_uri) &&
694 {
695 std::unique_ptr<ByteStream> res = std::make_unique<ByteInStream_URL>(path_or_uri, timeout);
696 if( nullptr != res && !res->fail() ) {
697 return res;
698 }
699 }
700 std::unique_ptr<ByteStream> res = std::make_unique<ByteStream_File>(path_or_uri, iomode_t::read);
701 if( nullptr != res && !res->fail() ) {
702 return res;
703 }
704 return nullptr;
705}
706
709 m_id(std::move(id_name)), m_timeout(timeout), m_buffer(BEST_URLSTREAM_RINGBUFFER_SIZE),
710 m_has_content_length( false ), m_content_size( 0 ), m_total_xfered( 0 ), m_result( io::io_result_t::NONE ),
711 m_offset(0)
712{ }
713
715 jau_DBG_PRINT("ByteInStream_Feed: close.0 %s, %s", id().c_str(), toStringInt().c_str());
716
717 if( io_result_t::NONE == m_result ) {
718 m_result = io_result_t::SUCCESS; // signal end of streaming
719 }
720 m_buffer.close( true /* zeromem */); // also unblocks all r/w ops
721 jau_DBG_PRINT("ByteInStream_Feed: close.X %s, %s", id().c_str(), toStringInt().c_str());
722}
723
724bool ByteInStream_Feed::available(size_t n) noexcept {
725 if( !good() || !canRead() || io_result_t::NONE != m_result ) {
726 // feeder completed, only remaining bytes in buffer available left
727 return m_buffer.size() >= n;
728 }
729 if( m_has_content_length && m_content_size - m_offset < n ) {
730 return false;
731 }
732 // I/O still in progress, we have to poll until data is available or timeout
733 // set_eof() unblocks ringbuffer via set_end_of_input(true) permanently, hence blocking call on !m_has_content_length is OK.
734 bool timeout_occured;
735 const size_t avail = m_buffer.waitForElements(n, m_timeout, timeout_occured);
736 if( avail < n ) {
737 if( timeout_occured ) {
739 if( io_result_t::NONE == m_result ) {
740 m_result = io_result_t::FAILED;
741 }
742 m_buffer.interruptWriter();
743 }
744 return false;
745 } else {
746 return true;
747 }
748}
749
750bool ByteInStream_Feed::isOpen() const noexcept {
751 // feeder has not ended or remaining bytes in buffer available left
752 return io_result_t::NONE == m_result || m_buffer.size() > 0;
753}
754
756 const size_type length = m_content_size;
757 if( fail() ) {
758 return ByteStream::npos;
759 } else if( m_rewindbuf.covered(m_mark, newPos) ){
761 m_offset = newPos;
762 return m_offset;
763 } else if( !hasContentSize() ) {
764 return ByteStream::npos;
765 } else if( newPos > length ) {
766 return ByteStream::npos;
767 } else if(newPos >= m_offset) {
769 discardRead(newPos - m_offset);
770 return m_offset;
771 } else {
772 jau_DBG_PRINT("ByteInStream_Feed::seek newPos %" PRIu64 "< position %" PRIu64 " not implemented", newPos, m_offset);
773 return ByteStream::npos;
774 }
775}
776
777[[nodiscard]] size_t ByteInStream_Feed::discard(size_t N) noexcept {
778 return discardRead(N);
779}
780
781[[nodiscard]] bool ByteInStream_Feed::setMark(size_type readLimit) noexcept {
782 if( m_rewindbuf.setMark(m_mark, m_offset, readLimit) ) {
783 m_mark = m_offset;
784 return true;
785 } else {
786 return false;
787 }
788}
789
790[[nodiscard]] bool ByteInStream_Feed::seekMark() noexcept {
791 if( npos == m_mark ) {
792 return false;
793 }
794 return m_mark == seek(m_mark);
795}
796
797size_t ByteInStream_Feed::read(void* out, size_t length) noexcept {
798 if( 0 == length || !good() || !canRead() ) {
799 return 0;
800 }
801 return m_rewindbuf.read(m_mark, m_offset, newData, out, length);
802}
803
804size_t ByteInStream_Feed::peek(void* out, size_t length, size_type peek_offset) noexcept {
805 (void)out;
806 (void)length;
807 (void)peek_offset;
808 jau_DBG_PRINT("ByteInStream_Feed::peek not implemented");
809 return 0;
810}
811
813 if ( ( io_result_t::NONE != m_result && m_buffer.isEmpty() ) ||
814 ( m_has_content_length && m_offset >= m_content_size ) )
815 {
817 }
818 if( io_result_t::FAILED == m_result ) {
820 }
821 return rdstate_impl();
822}
823
824size_t ByteInStream_Feed::write(const void* in, size_t length, const jau::fraction_i64& timeout) noexcept {
825 if( 0 < length && canWrite() && good() && io_result_t::NONE == m_result ) { // feeder still running
826 bool timeout_occured;
827 const uint8_t *in8 = reinterpret_cast<const uint8_t*>(in);
828 if( m_buffer.putBlocking(in8, in8+length, timeout, timeout_occured) ) {
829 m_total_xfered.fetch_add(length);
830 return length;
831 } else {
832 if( timeout_occured ) {
834 m_buffer.interruptWriter();
835 } else {
837 }
838 if( io_result_t::NONE == m_result ) {
839 m_result = io_result_t::FAILED;
840 }
841 return 0;
842 }
843 } else {
844 return 0;
845 }
846}
847
848void ByteInStream_Feed::setEOF(const io_result_t result) noexcept {
849 m_result = result;
850 m_buffer.set_end_of_input(true); // still considering last data, also irqs blocking ringbuffer reader
851}
852
853std::string ByteInStream_Feed::toStringInt() const noexcept {
854 return m_id+", ext[content_length "+( hasContentSize() ? jau::to_decstring(m_content_size.load()) : "n/a" )+
855 ", xfered "+jau::to_decstring(m_total_xfered.load())+
856 ", result "+std::to_string((int8_t)m_result.load())+
857 "], consumed "+std::to_string(m_offset)+
858 ", available "+std::to_string(getAvailable())+
859 ", iomode["+jau::io::to_string(m_iomode)+
860 ", iostate["+jau::io::to_string(rdstate())+
861 "], "+m_buffer.toString();
862}
863
864std::string ByteInStream_Feed::toString() const noexcept {
865 return "ByteInStream_Feed["+toStringInt()+"]";
866}
867
870 m_parent.close();
871 jau_DBG_PRINT("ByteInStream_Recorder: close.X %s", id().c_str());
872}
873
875 m_buffer.resize(0);
876 m_rec_offset = m_offset;
877 m_is_recording = true;
878}
879
881 m_is_recording = false;
882}
883
885 m_is_recording = false;
886 m_buffer.clear();
887 m_rec_offset = 0;
888}
889
890size_t ByteStream_Recorder::read(void* out, size_t length) noexcept {
891 const size_t consumed_bytes = m_parent.read(out, length);
892 m_offset += consumed_bytes;
893 if( isRecording() && consumed_bytes > 0 ) {
894 uint8_t* out_u8 = static_cast<uint8_t*>(out);
895 m_buffer.insert(m_buffer.end(), out_u8, out_u8+consumed_bytes);
896 }
897 return consumed_bytes;
898}
899
900size_t ByteStream_Recorder::write(const void* out, size_t length) noexcept {
901 const size_t consumed_bytes = m_parent.write(out, length);
902 m_offset += consumed_bytes;
903 if( isRecording() && consumed_bytes > 0 ) {
904 const uint8_t* out_u8 = reinterpret_cast<const uint8_t*>(out);
905 m_buffer.insert(m_buffer.end(), out_u8, out_u8+consumed_bytes);
906 }
907 return consumed_bytes;
908}
909
910std::string ByteStream_Recorder::toString() const noexcept {
911 return "ByteInStream_Recorder[parent "+m_parent.id()+", recording[on "+std::to_string(m_is_recording)+
912 " offset "+jau::to_decstring(m_rec_offset)+
913 "], consumed "+jau::to_decstring(m_offset)+
914 ", iomode["+jau::io::to_string(m_iomode)+
915 ", iostate["+jau::io::to_string(rdstate())+"]]";
916}
917
918std::string jau::io::to_string(const ioaccess_t v) noexcept {
919 return v == ioaccess_t::read ? "read" : "write";
920}
static constexpr void copy_mem(void *out, const void *in, size_t n) noexcept
static bool _jau_file_size(const int fd, const jau::io::fs::file_stats &stats, const off64_t cur_pos, ByteStream::size_type &len) noexcept
#define O_BINARY
#define __posix_lseek64
#define __posix_openat64
bool available(size_t n) noexcept override
Check whether n bytes are available in the input stream.
bool seekMark() noexcept override
Seeks stream position to markpos as set via setMark().
bool setMark(size_type readLimit) noexcept override
Set markpos to current position, allowing the stream to be seekMark().
std::string toString() const noexcept override
void setEOF(const io_result_t result) noexcept
Set end-of-data (EOS), i.e.
bool isOpen() const noexcept override
Checks if the stream has an associated file.
size_t peek(void *out, size_t length, size_type peek_offset) noexcept override
Not implemented, returns 0.
iostate_t rdstate() const noexcept override
Returns the current state flags.
ByteInStream_Feed(std::string id_name, const jau::fraction_i64 &timeout, lb_endian_t byteOrder=lb_endian_t::little) noexcept
Construct a ringbuffer backed externally provisioned byte input stream.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
size_t write(const void *in, size_t length, const jau::fraction_i64 &timeout) noexcept
Write given bytes to the async ringbuffer using explicit given timeout.
bool hasContentSize() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
size_t read(void *out, size_t length) noexcept override
Read from the source.
size_type seek(size_type newPos) noexcept override
newPos < position() limited to markpos, see setMark().
size_t discard(size_t N) noexcept override
Implemented by skipping input stream via read.
std::string toString() const noexcept override
bool isOpen() const noexcept override
Checks if the stream has an associated file.
ByteInStream_URL(std::string url, const jau::fraction_i64 &timeout, lb_endian_t byteOrder=lb_endian_t::little) noexcept
Construct a ringbuffer backed Http byte input stream.
iostate_t rdstate() const noexcept override
Returns the current state flags.
size_t read(void *out, size_t length) noexcept override
Read from the source.
bool setMark(size_type readLimit) noexcept override
Set markpos to current position, allowing the stream to be seekMark().
size_t discard(size_t N) noexcept override
Implemented by skipping input stream via read.
size_t peek(void *out, size_t length, size_type peek_offset) noexcept override
Read from the source but do not modify the internal offset.
bool hasContentSize() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
bool available(size_t n) noexcept override
Check whether n bytes are available in the input stream.
bool seekMark() noexcept override
Seeks stream position to markpos as set via setMark().
size_type seek(size_type newPos) noexcept override
newPos < position() limited to markpos, see setMark().
void close() noexcept override
Close the stream if supported by the underlying mechanism.
size_type contentSize() const noexcept override
Returns the content_size if known.
size_type seek(size_type newPos) noexcept override
Sets position indicator for output-streams or input-streams with known length, similar to e....
size_t peek(void *, size_t, size_type) noexcept override
Read from the source but do not modify the internal offset.
bool setMark(size_type readLimit) noexcept override
Set markpos to current position, allowing the stream to be seekMark().
size_t discard(size_t N) noexcept override
Discard the next N bytes of the data.
bool hasContentSize() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
void flush() noexcept override
Synchronizes all output operations, or do nothing.
bool isOpen() const noexcept override
Checks if the stream has an associated file.
int fd() const noexcept
Returns the file descriptor if is_open(), otherwise -1 for no file descriptor.
size_t write(const void *, size_t) noexcept override
Write to the data sink.
std::string toString() const noexcept override
ByteStream_File(const std::string &path, iomode_t iomode=iomode_t::rw, const jau::io::fs::fmode_t fmode=fs::fmode_t::def_file_prot, lb_endian_t byteOrder=lb_endian_t::little) noexcept
Construct a stream based byte stream from filesystem path, either an existing or new file.
bool available(size_t n) noexcept override
Return whether n bytes are available in the input stream, if has_content_size() or using an asynchron...
size_t read(void *, size_t) noexcept override
Read from the source.
bool seekMark() noexcept override
Seeks stream position to markpos as set via setMark().
std::string toString() const noexcept override
void close() noexcept override
Close the stream if supported by the underlying mechanism.
void startRecording() noexcept
Starts the recording.
size_t write(const void *, size_t) noexcept override
Write to the data sink.
void stopRecording() noexcept
Stops the recording.
size_t read(void *, size_t) noexcept override
Read from the source.
iostate_t rdstate() const noexcept override
Returns the current state flags.
void clearRecording() noexcept
Clears the recording.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
std::string toString() const noexcept override
size_t discard(size_t N) noexcept override
Discard the next N bytes of the data.
size_t peek(void *, size_t, size_type) noexcept override
Read from the source but do not modify the internal offset.
bool setMark(size_type readLimit) noexcept override
Set markpos to current position, allowing the stream to be seekMark().
size_type seek(size_type newPos) noexcept override
Sets position indicator for output-streams or input-streams with known length, similar to e....
bool seekMark() noexcept override
Seeks stream position to markpos as set via setMark().
bool available(size_t n) noexcept override
Return whether n bytes are available in the input stream, if has_content_size() or using an asynchron...
size_t read(void *, size_t) noexcept override
Read from the source.
ByteStream_SecMemory(const std::string &in, lb_endian_t byteOrder=lb_endian_t::little)
Construct a secure memory source that reads from a string, iomode_t::read.
size_t write(const void *, size_t) noexcept override
Write to the data sink.
constexpr bool canWrite() const noexcept
Returns true in case stream has iomode::write capabilities.
constexpr bool canRead() const noexcept
Returns true in case stream has iomode::read capabilities.
size_t discardRead(size_t n) noexcept
Fallback slow discard implementation usind read() in case of unknown stream size.
constexpr lb_endian_t byteOrder() const noexcept
Returns endian byte-order of stream storage.
uint64_t size_type
uint64_t size data type, bit position and count
ByteStream(iomode_t mode, lb_endian_t byteOrder=lb_endian_t::little) noexcept
constexpr iomode_t mode() const noexcept
static constexpr size_type npos
Invalid position constant, denoting unset mark() or invalid position.
bool fail() const noexcept
Checks if an error has occurred.
virtual iostate_t rdstate() const noexcept
Returns the current state flags.
void clearStateFlags(const iostate_t clr) noexcept
Clears given state flags from current value.
bool good() const noexcept
Checks if no error nor eof() has occurred i.e.
bool timeout() const noexcept
Checks if a timeout (non-recoverable) has occurred.
constexpr iostate_t rdstate_impl() const noexcept
constexpr void addstate_impl(iostate_t state) const noexcept
Platform agnostic representation of POSIX ::lstat() and ::stat() for a given pathname.
#define jau_DBG_PRINT(fmt,...)
Use for environment-variable environment::DEBUG conditional debug messages, prefix '[elapsed_time] De...
Definition debug.hpp:100
#define jau_ERR_PRINT(...)
Use for unconditional error messages, prefix '[elapsed_time] Error @ FILE:LINE FUNC: '.
Definition debug.hpp:150
lb_endian_t
Simplified reduced endian type only covering little- and big-endian.
const uint8_t * cast_char_ptr_to_uint8(const char *s) noexcept
#define PRAGMA_DISABLE_WARNING_TYPE_RANGE_LIMIT
constexpr bool is_set(const E mask, const E bits) noexcept
#define PRAGMA_DISABLE_WARNING_PUSH
#define PRAGMA_DISABLE_WARNING_POP
fmode_t
Generic file type and POSIX protection mode bits as used in file_stats, touch(), mkdir() etc.
constexpr ::mode_t posix_protection_bits(const fmode_t mask) noexcept
Returns the POSIX protection bits: rwx_all | set_uid | set_gid | sticky, i.e.
fraction< int64_t > fraction_i64
fraction using int64_t as integral type
AsyncStreamResponseRef read_url_stream_async(net_tk_handle handle, const std::string &url, http::PostRequestPtr httpPostReq, ByteRingbuffer *buffer, const AsyncStreamConsumerFunc &consumer_fn) noexcept
Asynchronous URL stream reader using the given AsyncStreamConsumerFunc consumer_fn.
Definition io_util.cpp:1121
bool is_local_file_protocol(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches the local file protocol, i.e.
Definition io_util.cpp:206
iomode_t
Stream I/O mode, e.g.
std::string to_string(const ioaccess_t v) noexcept
Return std::string representation of the given ioaccess_t.
io_result_t
I/O operation result value.
Definition io_util.hpp:55
ioaccess_t
I/O read or write access.
jau::function< bool(AsyncStreamResponse &, const uint8_t *, size_t, bool)> AsyncStreamConsumerFunc
Asynchronous stream consumer function.
Definition io_util.hpp:356
std::string toString(io_result_t v) noexcept
Definition io_util.hpp:67
std::unique_ptr< ByteStream > to_ByteInStream(const std::string &path_or_uri, jau::fraction_i64 timeout=20_s) noexcept
Parses the given path_or_uri, if it matches a supported protocol, see jau::io::uri::protocol_supporte...
const size_t BEST_URLSTREAM_RINGBUFFER_SIZE
bool protocol_supported(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches a supported by libcurl network protocols otherwis...
Definition io_util.cpp:196
@ rw
Read and write capabilities, i.e.
@ atend
Seek to end of (file) stream when opened.
@ trunc
Truncate existing (file) stream when opened with write.
@ read
Read capabilities.
@ NONE
Operation still in progress.
Definition io_util.hpp:60
@ FAILED
Operation failed.
Definition io_util.hpp:57
@ SUCCESS
Operation succeeded.
Definition io_util.hpp:63
@ read
Read intent.
@ none
No error occurred nor has EOS being reached.
@ failbit
Input or output operation failed (formatting or extraction error).
@ eofbit
An input operation reached the end of its stream (EOS).
@ timeout
Input or output operation failed due to timeout.
std::string to_decstring(const value_type &v, const char separator='\'', const nsize_t min_width=0) noexcept
Produces a decimal integer string representation of an integral integer value with given radix.
__pack(...): Produces MSVC, clang and gcc compatible lead-in and -out macros.
Definition backtrace.hpp:32
STL namespace.
relaxed_atomic_uint64 content_length
content_length tracking the content_length
Definition io_util.hpp:338
CXX_ALWAYS_INLINE _Tp load() const noexcept