37 #include <sys/types.h>
43 #include <curl/curl.h>
60#if defined(__FreeBSD__)
61 typedef off_t off64_t;
62 #define __posix_openat64 ::openat
63 #define __posix_lseek64 ::lseek
65 #define __posix_openat64 ::openat64
66 #define __posix_lseek64 ::lseek64
75inline constexpr void copy_mem(
void* out,
const void* in,
size_t n)
noexcept {
76 if(in !=
nullptr && out !=
nullptr && n > 0) {
77 std::memmove(out, in,
sizeof(uint8_t)*n);
82 return 1 ==
read(&out, 1);
86 return 1 ==
peek(&out, 1, 0);
90 uint8_t buf[1024] = { 0 };
95 const size_t got =
read(buf, std::min(n,
sizeof(buf)));
107 if( 0 == length || !
good() ) {
110 const size_t got = std::min<size_t>(m_source.size() - m_offset, length);
111 copy_mem(out, m_source.data() + m_offset, got);
113 if( m_source.size() == m_offset ) {
120 return m_source.size() - m_offset >= n;
124 const size_t bytes_left = m_source.size() - m_offset;
125 if(peek_offset >= bytes_left) {
128 const size_t got = std::min(bytes_left - peek_offset, length);
129 copy_mem(out, &m_source[m_offset + peek_offset], got);
146 return "ByteInStream_SecMemory[content size "+
jau::to_decstring(m_source.size())+
149 ", iostate["+jau::io::to_string(
rdstate())+
154 if( 0 == length || !
good() ) {
157 uint8_t* out_u8 =
static_cast<uint8_t*
>(out);
159 while( total < length ) {
161 while ( ( len =
::read(m_fd, out_u8+total, length-total) ) < 0 ) {
162 if ( errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR ) {
171 DBG_PRINT(
"ByteInStream_File::read: Error occurred in %s, errno %d %s",
to_string().c_str(), errno, strerror(errno));
174 total +=
static_cast<size_t>(len);
175 if( 0 == len || ( m_has_content_length && m_bytes_consumed + total >= m_content_size ) ) {
180 m_bytes_consumed += total;
185 if( 0 == length || !
good() ||
186 (
sizeof(
size_t) >=
sizeof(off64_t) && offset > std::numeric_limits<off64_t>::max() ) ||
187 ( m_has_content_length && m_content_size - m_bytes_consumed < offset + 1 ) ) {
194 abs_pos =
__posix_lseek64(m_fd,
static_cast<off64_t
>(offset), SEEK_CUR);
197 DBG_PRINT(
"ByteInStream_File::peek: Error occurred (offset1 %zd) in %s, errno %d %s",
198 offset,
to_string().c_str(), errno, strerror(errno));
202 if( abs_pos ==
static_cast<off64_t
>(offset) ) {
204 while ( ( len =
::read(m_fd, out, length) ) < 0 ) {
205 if ( errno == EAGAIN || errno == EINTR ) {
211 DBG_PRINT(
"ByteInStream_File::peak: Error occurred (read) in %s, errno %d %s",
to_string().c_str(), errno, strerror(errno));
216 if(
__posix_lseek64(m_fd,
static_cast<off64_t
>(m_bytes_consumed), SEEK_SET) < 0 ) {
219 DBG_PRINT(
"ByteInStream_File::peek: Error occurred (offset2 %zd) in %s, errno %d %s",
220 offset,
to_string().c_str(), errno, strerror(errno));
227 return is_open() &&
good() && ( !m_has_content_length || m_content_size - m_bytes_consumed >= (uint64_t)n );
232 stats(
fd), m_fd(-1), m_has_content_length(
false), m_content_size(0), m_bytes_consumed(0)
234 if( !stats.exists() || !stats.has_access() ) {
236 DBG_PRINT(
"ByteInStream_File::ctor: Error, not an existing or accessible file in %s, %s", stats.to_string().c_str(),
to_string().c_str());
239 m_content_size = m_has_content_length ? stats.size() : 0;
243 DBG_PRINT(
"ByteInStream_File::ctor: Error occurred in %s, %s", stats.to_string().c_str(),
to_string().c_str());
250 stats(), m_fd(-1), m_has_content_length(
false), m_content_size(0), m_bytes_consumed(0)
254 std::string path2 = path.substr(7);
259 if( !stats.exists() || !stats.has_access() ) {
261 DBG_PRINT(
"ByteInStream_File::ctor: Error, not an existing or accessible file in %s, %s", stats.to_string().c_str(),
to_string().c_str());
264 m_has_content_length =
true;
265 m_content_size = stats.size();
267 m_has_content_length =
false;
271 const int src_flags = O_RDONLY|
O_BINARY|O_NOCTTY;
272 if( stats.has_fd() ) {
273 m_fd = ::dup( stats.fd() );
279 DBG_PRINT(
"ByteInStream_File::ctor: Error while opening %s, %s", stats.to_string().c_str(),
to_string().c_str());
299 ", fd "+std::to_string(m_fd)+
300 ", iostate["+jau::io::to_string(
rdstate())+
301 "], "+stats.to_string()+
308 m_header_sync(), m_has_content_length(
false ), m_content_size( 0 ),
313 m_url_thread =
read_url_stream(m_url, m_buffer, m_header_sync, m_has_content_length, m_content_size, m_total_xfered, m_result);
314 if(
nullptr == m_url_thread ) {
321 DBG_PRINT(
"ByteInStream_URL: close.0 %s, %s",
id().c_str(), to_string_int().c_str());
327 m_buffer.close(
true );
328 if(
nullptr != m_url_thread && m_url_thread->joinable() ) {
329 DBG_PRINT(
"ByteInStream_URL: close.1 %s, %s",
id().c_str(), m_buffer.toString().c_str());
330 m_url_thread->join();
332 m_url_thread =
nullptr;
333 DBG_PRINT(
"ByteInStream_URL: close.X %s, %s",
id().c_str(), to_string_int().c_str());
339 return m_buffer.size() >= n;
341 m_header_sync.wait_until_completion(m_timeout);
342 if( m_has_content_length && m_content_size - m_bytes_consumed < n ) {
347 bool timeout_occured;
348 const size_t avail = m_buffer.waitForElements(n, m_timeout, timeout_occured);
350 if( timeout_occured ) {
355 m_buffer.interruptWriter();
369 m_header_sync.wait_until_completion(m_timeout);
370 if( 0 == length || !
good() ) {
373 bool timeout_occured =
false;
375 const size_t got = m_buffer.getBlocking(
static_cast<uint8_t*
>(out), length, 1, m_timeout, timeout_occured);
376 m_bytes_consumed += got;
377 if( timeout_occured ) {
382 m_buffer.interruptWriter();
392 ERR_PRINT(
"ByteInStream_URL::peek not implemented");
398 ( m_has_content_length && m_bytes_consumed >= m_content_size ) )
408std::string ByteInStream_URL::to_string_int() const noexcept {
415 "], "+m_buffer.toString();
418 return "ByteInStream_URL["+to_string_int()+
"]";
425 std::unique_ptr<ByteInStream> res = std::make_unique<ByteInStream_URL>(path_or_uri,
timeout);
426 if(
nullptr != res && !res->fail() ) {
430 std::unique_ptr<ByteInStream> res = std::make_unique<ByteInStream_File>(path_or_uri);
431 if(
nullptr != res && !res->fail() ) {
444 DBG_PRINT(
"ByteInStream_Feed: close.0 %s, %s",
id().c_str(), to_string_int().c_str());
449 m_buffer.close(
true );
450 DBG_PRINT(
"ByteInStream_Feed: close.X %s, %s",
id().c_str(), to_string_int().c_str());
456 return m_buffer.size() >= n;
458 if( m_has_content_length && m_content_size - m_bytes_consumed < n ) {
463 bool timeout_occured;
464 const size_t avail = m_buffer.waitForElements(n, m_timeout, timeout_occured);
466 if( timeout_occured ) {
471 m_buffer.interruptWriter();
485 if( 0 == length || !
good() ) {
488 bool timeout_occured =
false;
490 const size_t got = m_buffer.getBlocking(
static_cast<uint8_t*
>(out), length, 1, m_timeout, timeout_occured);
491 m_bytes_consumed += got;
492 if( timeout_occured ) {
497 m_buffer.interruptWriter();
507 ERR_PRINT(
"ByteInStream_Feed::peek not implemented");
513 ( m_has_content_length && m_bytes_consumed >= m_content_size ) )
525 bool timeout_occured;
526 if( m_buffer.putBlocking(in, in+length,
timeout, timeout_occured) ) {
527 m_total_xfered.fetch_add(length);
530 if( timeout_occured ) {
532 m_buffer.interruptWriter();
548 m_buffer.set_end_of_input(
true);
551std::string ByteInStream_Feed::to_string_int() const noexcept {
558 "], "+m_buffer.toString();
562 return "ByteInStream_Feed["+to_string_int()+
"]";
568 DBG_PRINT(
"ByteInStream_Recorder: close.X %s",
id().c_str());
573 m_rec_offset = m_bytes_consumed;
574 m_is_recording =
true;
578 m_is_recording =
false;
582 m_is_recording =
false;
588 const size_t consumed_bytes = m_parent.read(out, length);
589 m_bytes_consumed += consumed_bytes;
591 uint8_t* out_u8 =
static_cast<uint8_t*
>(out);
592 m_buffer.insert(m_buffer.end(), out_u8, out_u8+consumed_bytes);
594 return consumed_bytes;
598 return "ByteInStream_Recorder[parent "+m_parent.id()+
", recording[on "+std::to_string(m_is_recording)+
601 ", iostate["+jau::io::to_string(
rdstate())+
"]]";
605 return 1 ==
write(&in, 1);
609 if( 0 == length ||
fail() ) {
612 const uint8_t* out_u8 =
static_cast<const uint8_t*
>(out);
614 while( total < length ) {
616 while ( ( len =
::write(m_fd, out_u8+total, length-total) ) < 0 ) {
617 if ( errno == EAGAIN || errno == EINTR ) {
626 DBG_PRINT(
"ByteOutStream_File::write: Error occurred in %s, errno %d %s",
to_string().c_str(), errno, strerror(errno));
629 total +=
static_cast<size_t>(len);
635 m_bytes_consumed += total;
640: stats(
fd), m_fd(-1),
643 if( !stats.exists() || !stats.has_access() ) {
645 DBG_PRINT(
"ByteOutStream_File::ctor: Error, not an existing or accessible file in %s, %s", stats.to_string().c_str(),
to_string().c_str());
650 DBG_PRINT(
"ByteOutStream_File::ctor: Error occurred in %s, %s", stats.to_string().c_str(),
to_string().c_str());
662 std::string path2 = path.substr(7);
667 if( ( stats.exists() && !stats.is_file() && !stats.has_fd() ) || !stats.has_access() ) {
669 DBG_PRINT(
"ByteOutStream_File::ctor: Error, an existing non[file, fd] or not accessible element in %s, %s", stats.to_string().c_str(),
to_string().c_str());
672 if( stats.has_fd() ) {
673 m_fd = ::dup( stats.fd() );
675 const int dst_flags = ( stats.exists() ? 0 : O_CREAT|O_EXCL ) | O_WRONLY|
O_BINARY|O_NOCTTY;
680 DBG_PRINT(
"ByteOutStream_File::ctor: Error while opening %s, %s", stats.to_string().c_str(),
to_string().c_str());
682 if( stats.is_file() ) {
686 ERR_PRINT(
"Failed to position existing file to end %s, errno %d %s",
687 to_string().c_str(), errno, strerror(errno));
706 ", fd "+std::to_string(m_fd)+
707 ", iostate["+jau::io::to_string(
rdstate())+
708 "], "+stats.to_string()+
constexpr void copy_mem(void *out, const void *in, size_t n) noexcept
Platform agnostic representation of POSIX ::lstat() and ::stat() for a given pathname.
bool available(size_t n) noexcept override
Check whether n bytes are available in the input stream.
bool is_open() const noexcept override
Checks if the stream has an associated file.
bool has_content_size() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
void set_eof(const async_io_result_t result) noexcept
Set end-of-data (EOS), i.e.
ByteInStream_Feed(std::string id_name, const jau::fraction_i64 &timeout) noexcept
Construct a ringbuffer backed externally provisioned byte input stream.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
iostate rdstate() const noexcept override
Returns the current state flags.
bool write(uint8_t in[], size_t length, const jau::fraction_i64 &timeout) noexcept
Write given bytes to the async ringbuffer using explicit given timeout.
size_t peek(void *out, size_t length, size_t peek_offset) noexcept override
Read from the source but do not modify the internal offset.
std::string to_string() const noexcept override
size_t read(void *out, size_t length) noexcept override
Read from the source.
std::string to_string() const noexcept override
size_t peek(void *, size_t, size_t) noexcept override
Read from the source but do not modify the internal offset.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
size_t read(void *, size_t) noexcept override
Read from the source.
ByteInStream_File(const std::string &path) noexcept
Construct a stream based byte input stream from filesystem path.
bool available(size_t n) noexcept override
Return whether n bytes are available in the input stream, if has_content_size() or using an asynchron...
int fd() const noexcept
Returns the file descriptor if is_open(), otherwise -1 for no file descriptor.
bool is_open() const noexcept override
Checks if the stream has an associated file.
bool has_content_size() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
void clear_recording() noexcept
Clears the recording.
iostate rdstate() const noexcept override
Returns the current state flags.
void start_recording() noexcept
Starts the recording.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
bool is_recording() noexcept
size_t read(void *, size_t) noexcept override
Read from the source.
std::string to_string() const noexcept override
void stop_recording() noexcept
Stops the recording.
ByteInStream_SecMemory(const std::string &in)
Construct a secure memory source that reads from a string.
std::string to_string() const noexcept override
bool available(size_t n) noexcept override
Return whether n bytes are available in the input stream, if has_content_size() or using an asynchron...
size_t peek(void *, size_t, size_t) noexcept override
Read from the source but do not modify the internal offset.
size_t read(void *, size_t) noexcept override
Read from the source.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
std::string to_string() const noexcept override
size_t read(void *out, size_t length) noexcept override
Read from the source.
ByteInStream_URL(std::string url, const jau::fraction_i64 &timeout) noexcept
Construct a ringbuffer backed Http byte input stream.
iostate rdstate() const noexcept override
Returns the current state flags.
bool available(size_t n) noexcept override
Check whether n bytes are available in the input stream.
bool has_content_size() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
size_t peek(void *out, size_t length, size_t peek_offset) noexcept override
Read from the source but do not modify the internal offset.
bool is_open() const noexcept override
Checks if the stream has an associated file.
virtual size_t peek(void *out, size_t length, size_t peek_offset) noexcept=0
Read from the source but do not modify the internal offset.
size_t discard(size_t N) noexcept
Discard the next N bytes of the data.
virtual size_t read(void *out, size_t length) noexcept=0
Read from the source.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
ByteOutStream_File(const std::string &path, const jau::fs::fmode_t mode=jau::fs::fmode_t::def_file_prot) noexcept
Construct a stream based byte output stream from filesystem path, either an existing or new file.
size_t write(const void *, size_t) noexcept override
Write to the data sink.
int fd() const noexcept
Returns the file descriptor if is_open(), otherwise -1 for no file descriptor.
std::string to_string() const noexcept override
virtual size_t write(const void *in, size_t length) noexcept=0
Write to the data sink.
bool fail() const noexcept
Checks if an error has occurred.
constexpr iostate rdstate_impl() const noexcept
bool good() const noexcept
Checks if no error nor eof() has occurred i.e.
bool timeout() const noexcept
Checks if a timeout (non-recoverable) has occurred.
virtual iostate rdstate() const noexcept
Returns the current state flags.
constexpr void setstate_impl(iostate state) const noexcept
#define ERR_PRINT(...)
Use for unconditional error messages, prefix '[elapsed_time] Error @ FILE:LINE FUNC: '.
#define DBG_PRINT(...)
Use for environment-variable environment::DEBUG conditional debug messages, prefix '[elapsed_time] De...
std::string to_string(const endian_t v) noexcept
Return std::string representation of the given endian.
const uint8_t * cast_char_ptr_to_uint8(const char *s) noexcept
constexpr E & write(E &store, const E bits, bool set) noexcept
If set==true, sets the bits in store, i.e.
fmode_t
Generic file type and POSIX protection mode bits as used in file_stats, touch(), mkdir() etc.
constexpr ::mode_t posix_protection_bits(const fmode_t mask) noexcept
Returns the POSIX protection bits: rwx_all | set_uid | set_gid | sticky, i.e.
fraction< int64_t > fraction_i64
fraction using int64_t as integral type
std::unique_ptr< ByteInStream > to_ByteInStream(const std::string &path_or_uri, jau::fraction_i64 timeout=20_s) noexcept
Parses the given path_or_uri, if it matches a supported protocol, see jau::io::uri::protocol_supporte...
bool is_local_file_protocol(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches the local file protocol, i.e.
uint64_t read_url_stream(const std::string &url, secure_vector< uint8_t > &buffer, const StreamConsumerFunc &consumer_fn) noexcept
Synchronous URL stream reader using the given StreamConsumerFunc consumer_fn.
async_io_result_t
Asynchronous I/O operation result value.
const size_t BEST_URLSTREAM_RINGBUFFER_SIZE
bool protocol_supported(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches a supported by libcurl network protocols otherwis...
@ failbit
Input or output operation failed (formatting or extraction error).
@ eofbit
An input operation reached the end of its stream.
@ timeout
Input or output operation failed due to timeout.
@ NONE
Operation still in progress.
@ FAILED
Operation failed.
@ SUCCESS
Operation succeeded.
std::string to_decstring(const value_type &v, const char separator=',', const nsize_t width=0) noexcept
Produce a decimal string representation of an integral integer value.
__pack(...): Produces MSVC, clang and gcc compatible lead-in and -out macros.
CXX_ALWAYS_INLINE _Tp load() const noexcept