43 #include <sys/types.h>
49 #include <curl/curl.h>
66#if defined(__FreeBSD__)
67 typedef off_t off64_t;
68 #define __posix_openat64 ::openat
69 #define __posix_lseek64 ::lseek
71 #define __posix_openat64 ::openat64
72 #define __posix_lseek64 ::lseek64
81inline constexpr static void copy_mem(
void* out,
const void* in,
size_t n)
noexcept {
82 if(in !=
nullptr && out !=
nullptr && n > 0) {
83 std::memcpy(out, in,
sizeof(uint8_t)*n);
91 uint8_t buf[1024] = { 0 };
96 const size_t got =
read(buf, std::min(n,
sizeof(buf)));
111 }
else if( newPos > m_source.size() || newPos > std::numeric_limits<size_t>::max() ) {
116 m_offset =
static_cast<size_t>(newPos);
117 if( m_mark > newPos ) {
120 if( m_source.size() == m_offset ) {
130 size_t n = std::min(N, m_source.size() - m_offset);
132 if( m_source.size() == m_offset ) {
144 if(
npos == m_mark ) {
147 return m_mark ==
seek(m_mark);
154 const size_t got = std::min<size_t>(m_source.size() - m_offset, length);
155 copy_mem(out, m_source.data() + m_offset, got);
157 if( m_source.size() == m_offset ) {
164 return canRead() && m_source.size() - m_offset >= n;
170 const size_t bytes_left = m_source.size() - m_offset;
172 ( peek_offset > std::numeric_limits<size_t>::max() ) ||
173 ( bytes_left < peek_offset + 1 ) ) {
177 const size_t po_sz =
static_cast<size_t>(peek_offset);
178 const size_t peek_len = std::min(bytes_left - po_sz, length);
179 copy_mem(out, &m_source[m_offset + po_sz], peek_len);
187 const size_t got = std::min<size_t>(m_source.size() - m_offset, length);
188 copy_mem(m_source.data() + m_offset, out, got);
197 m_offset(0), m_mark(
npos)
207 return "ByteInStream_SecMemory[content size "+
jau::to_decstring(m_source.size())+
217 !m_has_content_length ||
218 ( !
canWrite() && newPos > m_content_size ) ||
219 newPos > std::numeric_limits<off64_t>::max() )
224 if( newPos != m_offset ) {
225 off64_t abs_pos =
__posix_lseek64(m_fd,
static_cast<off64_t
>(newPos), SEEK_SET);
232 if( m_mark > m_offset ) {
235 if( m_content_size == m_offset ) {
244 if( m_has_content_length ) {
248 const size_t n = std::min<size_t>(N, m_content_size - m_offset);
249 const size_t p0 = m_offset;
250 return seek(p0 + n) - p0;
262 if(
npos == m_mark ) {
265 return m_mark ==
seek(m_mark);
272 uint8_t* out_u8 =
static_cast<uint8_t*
>(out);
274 while( total < length ) {
276 while ( ( len =
::read(m_fd, out_u8+total, length-total) ) < 0 ) {
277 if ( errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR ) {
286 jau_DBG_PRINT(
"ByteInStream_File::read: Error occurred in %s, errno %d %s",
toString(), errno, strerror(errno));
289 total +=
static_cast<size_t>(len);
290 if( 0 == len || ( m_has_content_length && m_offset + total >= m_content_size ) ) {
300 const size_type bytes_left = ( m_has_content_length ? m_content_size : std::numeric_limits<off64_t>::max() ) - m_offset;
302 ( peek_offset > std::numeric_limits<off64_t>::max() ) ||
303 ( bytes_left < peek_offset + 1 ) ) {
309 if( 0 < peek_offset ) {
310 abs_pos =
__posix_lseek64(m_fd,
static_cast<off64_t
>(peek_offset), SEEK_CUR);
313 jau_DBG_PRINT(
"ByteInStream_File::peek: Error occurred (offset1 %" PRIu64
") in %s, errno %d %s",
314 peek_offset,
toString(), errno, strerror(errno));
318 if( abs_pos ==
static_cast<off64_t
>(peek_offset) ) {
320 while ( ( len =
::read(m_fd, out, length) ) < 0 ) {
321 if ( errno == EAGAIN || errno == EINTR ) {
327 jau_DBG_PRINT(
"ByteInStream_File::peak: Error occurred (read) in %s, errno %d %s",
toString(), errno, strerror(errno));
332 if(
__posix_lseek64(m_fd,
static_cast<off64_t
>(m_offset), SEEK_SET) < 0 ) {
335 jau_DBG_PRINT(
"ByteInStream_File::peek: Error occurred (offset2 %" PRIu64
") in %s, errno %d %s",
336 peek_offset,
toString(), errno, strerror(errno));
350 const uint8_t* out_u8 =
static_cast<const uint8_t*
>(out);
352 while( total < length ) {
354 while ( ( len =
::write(m_fd, out_u8+total, length-total) ) < 0 ) {
355 if ( errno == EAGAIN || errno == EINTR ) {
364 jau_DBG_PRINT(
"ByteOutStream_File::write: Error occurred in %s, errno %d %s",
toString(), errno, strerror(errno));
367 total +=
static_cast<size_t>(len);
374 if( m_has_content_length && m_offset > m_content_size ) {
375 m_content_size = m_offset;
386 if( !stats.is_file() ) {
395 if( cur_pos2 != cur_pos ) {
396 jau_DBG_PRINT(
"ByteInStream_File::file_size: Error rewinding to current position failed, orig-pos %" PRIi64
" -> %" PRIi64
", errno %d %s.",
397 cur_pos, cur_pos2, errno, strerror(errno));
406 m_stats(
fd), m_fd(-1), m_has_content_length(
false), m_content_size(0),
407 m_offset(0), m_mark(
npos)
409 if( !m_stats.exists() || !m_stats.has_access() ) {
411 jau_DBG_PRINT(
"ByteInStream_File::ctor: Error, not an existing or accessible file in %s, %s", m_stats.toString(),
toString());
423 jau_ERR_PRINT(
"Failed to read position of existing file %s, errno %d %s",
toString(), errno, strerror(errno));
428 m_has_content_length =
true;
434 m_stats(), m_fd(-1), m_has_content_length(
false), m_content_size(0),
435 m_offset(0), m_mark(
npos)
439 std::string path2 = path.substr(7);
449 if( ( m_stats.exists() && !m_stats.is_file() && !m_stats.has_fd() ) || !m_stats.has_access() ) {
451 jau_DBG_PRINT(
"ByteStream_File::ctor: Error, an existing non[file, fd] or not accessible element in %s, %s", m_stats.toString(),
toString());
454 if( !
canWrite() && !m_stats.exists() ) {
456 jau_DBG_PRINT(
"ByteStream_File::ctor: Error, can't open non-existing read-only file in %s, %s", m_stats.toString(),
toString());
459 bool truncated =
false;
460 bool just_opened =
false;
461 if( m_stats.has_fd() ) {
462 m_fd = ::dup( m_stats.fd() );
473 if( !m_stats.exists() ) {
484 jau_DBG_PRINT(
"ByteInStream_File::ctor: Error while opening %s, %s", m_stats.toString(),
toString());
491 const off64_t cur_pos = just_opened || !m_stats.is_file() ? 0 :
__posix_lseek64(m_fd, 0, SEEK_CUR);
494 jau_ERR_PRINT(
"Failed to read position of existing file %s, errno %d %s",
toString(), errno, strerror(errno));
499 m_has_content_length =
true;
505 jau_ERR_PRINT(
"Failed to position existing file to end %s, errno %d %s",
toString(), errno, strerror(errno));
533 ", fd "+std::to_string(m_fd)+
536 "], "+m_stats.toString()+
545 m_offset(0), m_mark(
npos), m_rewindbuf()
549 jau_DBG_PRINT(
"ByteInStream_URL: close.0 %s, %s",
id(), to_string_int());
551 if( m_stream_resp->processing() ) {
555 m_buffer.close(
true );
556 if( m_stream_resp->thread.joinable() ) {
557 jau_DBG_PRINT(
"ByteInStream_URL: close.1 %s, %s",
id(), m_buffer.toString());
558 m_stream_resp->thread.join();
561 m_stream_resp->thread.swap(
none);
562 jau_DBG_PRINT(
"ByteInStream_URL: close.X %s, %s",
id(), to_string_int());
566 if( !
good() || !
canRead() || !m_stream_resp->processing() ) {
568 return m_buffer.size() >= n;
570 m_stream_resp->header_resp.wait_until_completion(m_timeout);
571 if( m_stream_resp->has_content_length && m_stream_resp->content_length - m_offset < n ) {
576 bool timeout_occured;
577 const size_t avail = m_buffer.waitForElements(n, m_timeout, timeout_occured);
579 if( timeout_occured ) {
581 if( m_stream_resp->processing() ) {
584 m_buffer.interruptWriter();
594 return m_stream_resp->processing() || m_buffer.size() > 0;
598 m_stream_resp->header_resp.wait_until_completion(m_timeout);
599 return m_stream_resp->has_content_length;
603 m_stream_resp->header_resp.wait_until_completion(m_timeout);
607 }
else if( m_rewindbuf.covered(m_mark, newPos) ){
613 }
else if( newPos > length ) {
615 }
else if(newPos >= m_offset) {
620 jau_DBG_PRINT(
"ByteInStream_URL::seek newPos %" PRIu64
"< position %" PRIu64
" not implemented", newPos, m_offset);
630 if( m_rewindbuf.setMark(m_mark, m_offset, readLimit) ) {
639 if(
npos == m_mark ) {
642 return m_mark ==
seek(m_mark);
646 m_stream_resp->header_resp.wait_until_completion(m_timeout);
650 return m_rewindbuf.read(m_mark, m_offset, newData, out, length);
662 if ( ( !m_stream_resp->processing() && m_buffer.isEmpty() ) ||
663 ( m_stream_resp->has_content_length && m_offset >= m_stream_resp->content_length ) )
667 if( m_stream_resp->failed() ) {
673std::string ByteInStream_URL::to_string_int() const noexcept {
676 ", result "+
std::
to_string((int8_t)m_stream_resp->result.load())+
685 return "ByteInStream_URL["+to_string_int()+
"]";
692 std::unique_ptr<ByteStream> res = std::make_unique<ByteInStream_URL>(path_or_uri,
timeout);
693 if(
nullptr != res && !res->fail() ) {
697 std::unique_ptr<ByteStream> res = std::make_unique<ByteStream_File>(path_or_uri,
iomode_t::read);
698 if(
nullptr != res && !res->fail() ) {
707 m_has_content_length(
false ), m_content_size( 0 ), m_total_xfered( 0 ), m_result(
io::io_result_t::NONE ),
712 jau_DBG_PRINT(
"ByteInStream_Feed: close.0 %s, %s",
id(), toStringInt());
717 m_buffer.close(
true );
718 jau_DBG_PRINT(
"ByteInStream_Feed: close.X %s, %s",
id(), toStringInt());
724 return m_buffer.size() >= n;
726 if( m_has_content_length && m_content_size - m_offset < n ) {
731 bool timeout_occured;
732 const size_t avail = m_buffer.waitForElements(n, m_timeout, timeout_occured);
734 if( timeout_occured ) {
739 m_buffer.interruptWriter();
756 }
else if( m_rewindbuf.covered(m_mark, newPos) ){
762 }
else if( newPos > length ) {
764 }
else if(newPos >= m_offset) {
769 jau_DBG_PRINT(
"ByteInStream_Feed::seek newPos %" PRIu64
"< position %" PRIu64
" not implemented", newPos, m_offset);
779 if( m_rewindbuf.setMark(m_mark, m_offset, readLimit) ) {
788 if(
npos == m_mark ) {
791 return m_mark ==
seek(m_mark);
798 return m_rewindbuf.read(m_mark, m_offset, newData, out, length);
811 ( m_has_content_length && m_offset >= m_content_size ) )
823 bool timeout_occured;
824 const uint8_t *in8 =
reinterpret_cast<const uint8_t*
>(in);
825 if( m_buffer.putBlocking(in8, in8+length,
timeout, timeout_occured) ) {
826 m_total_xfered.fetch_add(length);
829 if( timeout_occured ) {
831 m_buffer.interruptWriter();
847 m_buffer.set_end_of_input(
true);
850std::string ByteInStream_Feed::toStringInt() const noexcept {
862 return "ByteInStream_Feed["+toStringInt()+
"]";
873 m_rec_offset = m_offset;
874 m_is_recording =
true;
878 m_is_recording =
false;
882 m_is_recording =
false;
888 const size_t consumed_bytes = m_parent.read(out, length);
889 m_offset += consumed_bytes;
891 uint8_t* out_u8 =
static_cast<uint8_t*
>(out);
892 m_buffer.insert(m_buffer.end(), out_u8, out_u8+consumed_bytes);
894 return consumed_bytes;
898 const size_t consumed_bytes = m_parent.write(out, length);
899 m_offset += consumed_bytes;
901 const uint8_t* out_u8 =
reinterpret_cast<const uint8_t*
>(out);
902 m_buffer.insert(m_buffer.end(), out_u8, out_u8+consumed_bytes);
904 return consumed_bytes;
908 return "ByteInStream_Recorder[parent "+m_parent.id()+
", recording[on "+std::to_string(m_is_recording)+
static constexpr void copy_mem(void *out, const void *in, size_t n) noexcept
static bool _jau_file_size(const int fd, const jau::io::fs::file_stats &stats, const off64_t cur_pos, ByteStream::size_type &len) noexcept
bool available(size_t n) noexcept override
Check whether n bytes are available in the input stream.
bool seekMark() noexcept override
Seeks stream position to markpos as set via setMark().
bool setMark(size_type readLimit) noexcept override
Set markpos to current position, allowing the stream to be seekMark().
std::string toString() const noexcept override
void setEOF(const io_result_t result) noexcept
Set end-of-data (EOS), i.e.
bool isOpen() const noexcept override
Checks if the stream has an associated file.
size_t peek(void *out, size_t length, size_type peek_offset) noexcept override
Not implemented, returns 0.
iostate_t rdstate() const noexcept override
Returns the current state flags.
ByteInStream_Feed(std::string id_name, const jau::fraction_i64 &timeout, lb_endian_t byteOrder=lb_endian_t::little) noexcept
Construct a ringbuffer backed externally provisioned byte input stream.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
size_t write(const void *in, size_t length, const jau::fraction_i64 &timeout) noexcept
Write given bytes to the async ringbuffer using explicit given timeout.
bool hasContentSize() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
size_t read(void *out, size_t length) noexcept override
Read from the source.
size_type seek(size_type newPos) noexcept override
newPos < position() limited to markpos, see setMark().
size_t discard(size_t N) noexcept override
Implemented by skipping input stream via read.
std::string toString() const noexcept override
bool isOpen() const noexcept override
Checks if the stream has an associated file.
ByteInStream_URL(std::string url, const jau::fraction_i64 &timeout, lb_endian_t byteOrder=lb_endian_t::little) noexcept
Construct a ringbuffer backed Http byte input stream.
iostate_t rdstate() const noexcept override
Returns the current state flags.
size_t read(void *out, size_t length) noexcept override
Read from the source.
bool setMark(size_type readLimit) noexcept override
Set markpos to current position, allowing the stream to be seekMark().
size_t discard(size_t N) noexcept override
Implemented by skipping input stream via read.
size_t peek(void *out, size_t length, size_type peek_offset) noexcept override
Read from the source but do not modify the internal offset.
bool hasContentSize() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
bool available(size_t n) noexcept override
Check whether n bytes are available in the input stream.
bool seekMark() noexcept override
Seeks stream position to markpos as set via setMark().
size_type seek(size_type newPos) noexcept override
newPos < position() limited to markpos, see setMark().
void close() noexcept override
Close the stream if supported by the underlying mechanism.
size_type contentSize() const noexcept override
Returns the content_size if known.
size_type seek(size_type newPos) noexcept override
Sets position indicator for output-streams or input-streams with known length, similar to e....
size_t peek(void *, size_t, size_type) noexcept override
Read from the source but do not modify the internal offset.
bool setMark(size_type readLimit) noexcept override
Set markpos to current position, allowing the stream to be seekMark().
size_t discard(size_t N) noexcept override
Discard the next N bytes of the data.
bool hasContentSize() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
void flush() noexcept override
Synchronizes all output operations, or do nothing.
bool isOpen() const noexcept override
Checks if the stream has an associated file.
int fd() const noexcept
Returns the file descriptor if is_open(), otherwise -1 for no file descriptor.
size_t write(const void *, size_t) noexcept override
Write to the data sink.
std::string toString() const noexcept override
ByteStream_File(const std::string &path, iomode_t iomode=iomode_t::rw, const jau::io::fs::fmode_t fmode=fs::fmode_t::def_file_prot, lb_endian_t byteOrder=lb_endian_t::little) noexcept
Construct a stream based byte stream from filesystem path, either an existing or new file.
bool available(size_t n) noexcept override
Return whether n bytes are available in the input stream, if has_content_size() or using an asynchron...
size_t read(void *, size_t) noexcept override
Read from the source.
bool seekMark() noexcept override
Seeks stream position to markpos as set via setMark().
std::string toString() const noexcept override
void close() noexcept override
Close the stream if supported by the underlying mechanism.
void startRecording() noexcept
Starts the recording.
size_t write(const void *, size_t) noexcept override
Write to the data sink.
bool isRecording() noexcept
void stopRecording() noexcept
Stops the recording.
size_t read(void *, size_t) noexcept override
Read from the source.
iostate_t rdstate() const noexcept override
Returns the current state flags.
void clearRecording() noexcept
Clears the recording.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
std::string toString() const noexcept override
size_t discard(size_t N) noexcept override
Discard the next N bytes of the data.
size_t peek(void *, size_t, size_type) noexcept override
Read from the source but do not modify the internal offset.
bool setMark(size_type readLimit) noexcept override
Set markpos to current position, allowing the stream to be seekMark().
size_type seek(size_type newPos) noexcept override
Sets position indicator for output-streams or input-streams with known length, similar to e....
bool seekMark() noexcept override
Seeks stream position to markpos as set via setMark().
bool available(size_t n) noexcept override
Return whether n bytes are available in the input stream, if has_content_size() or using an asynchron...
size_t read(void *, size_t) noexcept override
Read from the source.
ByteStream_SecMemory(const std::string &in, lb_endian_t byteOrder=lb_endian_t::little)
Construct a secure memory source that reads from a string, iomode_t::read.
size_t write(const void *, size_t) noexcept override
Write to the data sink.
constexpr bool canWrite() const noexcept
Returns true in case stream has iomode::write capabilities.
constexpr bool canRead() const noexcept
Returns true in case stream has iomode::read capabilities.
size_t discardRead(size_t n) noexcept
Fallback slow discard implementation usind read() in case of unknown stream size.
constexpr lb_endian_t byteOrder() const noexcept
Returns endian byte-order of stream storage.
uint64_t size_type
uint64_t size data type, bit position and count
ByteStream(iomode_t mode, lb_endian_t byteOrder=lb_endian_t::little) noexcept
constexpr iomode_t mode() const noexcept
static constexpr size_type npos
Invalid position constant, denoting unset mark() or invalid position.
bool fail() const noexcept
Checks if an error has occurred.
virtual iostate_t rdstate() const noexcept
Returns the current state flags.
void clearStateFlags(const iostate_t clr) noexcept
Clears given state flags from current value.
bool good() const noexcept
Checks if no error nor eof() has occurred i.e.
bool timeout() const noexcept
Checks if a timeout (non-recoverable) has occurred.
constexpr iostate_t rdstate_impl() const noexcept
constexpr void addstate_impl(iostate_t state) const noexcept
Platform agnostic representation of POSIX ::lstat() and ::stat() for a given pathname.
#define jau_DBG_PRINT(fmt,...)
Use for environment-variable environment::DEBUG conditional debug messages, prefix '[elapsed_time] De...
#define jau_ERR_PRINT(...)
Use for unconditional error messages, prefix '[elapsed_time] Error @ FILE:LINE FUNC: '.
lb_endian_t
Simplified reduced endian type only covering little- and big-endian.
const uint8_t * cast_char_ptr_to_uint8(const char *s) noexcept
#define PRAGMA_DISABLE_WARNING_TYPE_RANGE_LIMIT
constexpr bool is_set(const E mask, const E bits) noexcept
#define PRAGMA_DISABLE_WARNING_PUSH
#define PRAGMA_DISABLE_WARNING_POP
fmode_t
Generic file type and POSIX protection mode bits as used in file_stats, touch(), mkdir() etc.
constexpr ::mode_t posix_protection_bits(const fmode_t mask) noexcept
Returns the POSIX protection bits: rwx_all | set_uid | set_gid | sticky, i.e.
fraction< int64_t > fraction_i64
fraction using int64_t as integral type
AsyncStreamResponseRef read_url_stream_async(net_tk_handle handle, const std::string &url, http::PostRequestPtr httpPostReq, ByteRingbuffer *buffer, const AsyncStreamConsumerFunc &consumer_fn) noexcept
Asynchronous URL stream reader using the given AsyncStreamConsumerFunc consumer_fn.
bool is_local_file_protocol(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches the local file protocol, i.e.
iomode_t
Stream I/O mode, e.g.
std::string to_string(const ioaccess_t v) noexcept
Return std::string representation of the given ioaccess_t.
io_result_t
I/O operation result value.
ioaccess_t
I/O read or write access.
jau::function< bool(AsyncStreamResponse &, const uint8_t *, size_t, bool)> AsyncStreamConsumerFunc
Asynchronous stream consumer function.
std::string toString(io_result_t v) noexcept
std::unique_ptr< ByteStream > to_ByteInStream(const std::string &path_or_uri, jau::fraction_i64 timeout=20_s) noexcept
Parses the given path_or_uri, if it matches a supported protocol, see jau::io::uri::protocol_supporte...
const size_t BEST_URLSTREAM_RINGBUFFER_SIZE
bool protocol_supported(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches a supported by libcurl network protocols otherwis...
@ rw
Read and write capabilities, i.e.
@ atend
Seek to end of (file) stream when opened.
@ trunc
Truncate existing (file) stream when opened with write.
@ NONE
Operation still in progress.
@ FAILED
Operation failed.
@ SUCCESS
Operation succeeded.
@ none
No error occurred nor has EOS being reached.
@ failbit
Input or output operation failed (formatting or extraction error).
@ eofbit
An input operation reached the end of its stream (EOS).
@ timeout
Input or output operation failed due to timeout.
std::string to_decstring(const value_type &v, const char separator='\'', const nsize_t min_width=0) noexcept
Produces a decimal integer string representation of an integral integer value with given radix.
__pack(...): Produces MSVC, clang and gcc compatible lead-in and -out macros.
relaxed_atomic_uint64 content_length
content_length tracking the content_length
CXX_ALWAYS_INLINE _Tp load() const noexcept